diff --git a/docs/en/12-taos-sql/24-show.md b/docs/en/12-taos-sql/24-show.md index e2aff7a878..1f340cab30 100644 --- a/docs/en/12-taos-sql/24-show.md +++ b/docs/en/12-taos-sql/24-show.md @@ -189,7 +189,7 @@ show table distributed d0\G; Show Example

 *************************** 1.row ***************************
-_block_dist: Total_Blocks=[5] Total_Size=[93.65 Kb] Average_size=[18.73 Kb] Compression_Ratio=[23.98 %]
+_block_dist: Total_Blocks=[5] Total_Size=[93.65 KB] Average_size=[18.73 KB] Compression_Ratio=[23.98 %]
 
 Total_Blocks :  Table `d0` contains total 5 blocks
 
diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
index 7551ad46b1..d54bb60e93 100644
--- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
+++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
@@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
     taos.exec_many([
         format!("DROP TOPIC IF EXISTS tmq_meters"),
         format!("DROP DATABASE IF EXISTS `{db}`"),
-        format!("CREATE DATABASE `{db}`"),
+        format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
         format!("USE `{db}`"),
         // create super table
         format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
diff --git a/docs/zh/12-taos-sql/24-show.md b/docs/zh/12-taos-sql/24-show.md
index c85efa2376..ab29a1ee50 100644
--- a/docs/zh/12-taos-sql/24-show.md
+++ b/docs/zh/12-taos-sql/24-show.md
@@ -189,7 +189,7 @@ SHOW TABLE DISTRIBUTED table_name;
 
 *************************** 1.row ***************************
 
-_block_dist: Total_Blocks=[5] Total_Size=[93.65 Kb] Average_size=[18.73 Kb] Compression_Ratio=[23.98 %]
+_block_dist: Total_Blocks=[5] Total_Size=[93.65 KB] Average_size=[18.73 KB] Compression_Ratio=[23.98 %]
 
 Total_Blocks:  表 d0 占用的 block 个数为 5 个
 
diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h
index 09198aa038..45050e5d64 100644
--- a/include/libs/wal/wal.h
+++ b/include/libs/wal/wal.h
@@ -132,7 +132,7 @@ typedef struct {
 } SWalRef;
 
 typedef struct {
-  int8_t scanUncommited;
+//  int8_t scanUncommited;
   int8_t scanNotApplied;
   int8_t scanMeta;
   int8_t enableRef;
diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c
index e64e749b5b..c8fb5f7bd8 100644
--- a/source/client/src/clientSml.c
+++ b/source/client/src/clientSml.c
@@ -1580,7 +1580,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
     code = smlModifyDBSchemas(info);
     if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
         || code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
-        || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break;
+        || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) {
+      break;
+    }
     taosMsleep(100);
     uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
   } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index a73c08e69a..8b708c3e0f 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -932,7 +932,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
     return -1;
   }
 
-  if(pDst->nextColId < 0 && pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
+  if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
     terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
     return -1;
   }
@@ -1163,8 +1163,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
   if (mndAllocStbSchemas(pOld, pNew) != 0) {
     return -1;
   }
-
-  if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ntags){
+ 
+  if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags){
     terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
     return -1;
   }
@@ -1476,7 +1476,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
     return -1;
   }
 
-  if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ncols){
+  if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols){
     terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
     return -1;
   }
diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c
index f4d6e27dea..e62102fa77 100644
--- a/source/dnode/mnode/impl/src/mndSubscribe.c
+++ b/source/dnode/mnode/impl/src/mndSubscribe.c
@@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
 
 static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
                                         const SMqRebOutputVg *pRebVg) {
-  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
-    terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
-    return -1;
-  }
+//  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
+//    terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
+//    return -1;
+//  }
 
   void   *buf;
   int32_t tlen;
@@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
   }
 }
 
+static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
+  for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
+    SMqVgEp       *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
+    SMqRebOutputVg outputVg = {
+        .oldConsumerId = pConsumerEp->consumerId,
+        .newConsumerId = pConsumerEp->consumerId,
+        .pVgEp = pVgEp,
+    };
+    taosArrayPush(pOutput->rebVgs, &outputVg);
+  }
+}
+
 static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
                                         int32_t imbConsumerNum) {
   const char *pSubKey = pOutput->pSub->key;
@@ -290,24 +302,19 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
     taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
     if (consumerVgNum > minVgCnt) {
       if (imbCnt < imbConsumerNum) {
-        if (consumerVgNum == minVgCnt + 1) {
-          imbCnt++;
-          continue;
-        } else {
-          // pop until equal minVg + 1
-          while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
-            SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
-            SMqRebOutputVg outputVg = {
-                .oldConsumerId = pConsumerEp->consumerId,
-                .newConsumerId = -1,
-                .pVgEp = pVgEp,
-            };
-            taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
-            mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
-                  pConsumerEp->consumerId);
-          }
-          imbCnt++;
+        // pop until equal minVg + 1
+        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
+          SMqVgEp       *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
+          SMqRebOutputVg outputVg = {
+              .oldConsumerId = pConsumerEp->consumerId,
+              .newConsumerId = -1,
+              .pVgEp = pVgEp,
+          };
+          taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
+          mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
+                pConsumerEp->consumerId);
         }
+        imbCnt++;
       } else {
         // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
         while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
@@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
         }
       }
     }
+    putNoTransferToOutput(pOutput, pConsumerEp);
   }
 }
 
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index bbdfd715b5..a0f913ec10 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
 }
 
 int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
+  int ret = 0;
   SMqRebVgReq req = {0};
   tDecodeSMqRebVgReq(msg, &req);
 
@@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
 
     if (req.newConsumerId == -1) {
       tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
-      taosMemoryFree(req.qmsg);
-      return 0;
+      goto end;
     }
 
     STqHandle tqHandle = {0};
@@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
     // TODO version should be assigned and refed during preprocess
     SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
     if (pRef == NULL) {
-      taosMemoryFree(req.qmsg);
-      return -1;
+      ret = -1;
+      goto end;
     }
 
     int64_t ver = pRef->refVer;
@@ -534,49 +534,42 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
     taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
     tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
             pHandle->consumerId, oldConsumerId);
-    if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
-      taosMemoryFree(req.qmsg);
-      return -1;
-    }
+    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
+    goto end;
   } else {
     if (pHandle->consumerId == req.newConsumerId) {  // do nothing
       tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
-      atomic_store_32(&pHandle->epoch, -1);
       atomic_add_fetch_32(&pHandle->epoch, 1);
-      taosMemoryFree(req.qmsg);
-      return tqMetaSaveHandle(pTq, req.subKey, pHandle);
+
     } else {
       tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
              req.newConsumerId);
-
-      // kill executing task
-      qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
-      if (pTaskInfo != NULL) {
-        qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
-      }
-
-      taosWLockLatch(&pTq->lock);
-      atomic_store_32(&pHandle->epoch, 0);
-
-      // remove if it has been register in the push manager, and return one empty block to consumer
-      tqUnregisterPushHandle(pTq, pHandle);
-
       atomic_store_64(&pHandle->consumerId, req.newConsumerId);
-
-      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
-        qStreamCloseTsdbReader(pTaskInfo);
-      }
-
-      taosWUnLockLatch(&pTq->lock);
-      if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
-        taosMemoryFree(req.qmsg);
-        return -1;
-      }
+      atomic_store_32(&pHandle->epoch, 0);
     }
+    // kill executing task
+    qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
+    if (pTaskInfo != NULL) {
+      qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
+    }
+
+    taosWLockLatch(&pTq->lock);
+    // remove if it has been register in the push manager, and return one empty block to consumer
+    tqUnregisterPushHandle(pTq, pHandle);
+
+
+    if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
+      qStreamCloseTsdbReader(pTaskInfo);
+    }
+
+    taosWUnLockLatch(&pTq->lock);
+    ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
+    goto end;
   }
 
+end:
   taosMemoryFree(req.qmsg);
-  return 0;
+  return ret;
 }
 
 int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c
index 9ee8e667db..b956027741 100644
--- a/source/dnode/vnode/src/tq/tqRestore.c
+++ b/source/dnode/vnode/src/tq/tqRestore.c
@@ -120,6 +120,8 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
         continue;
       }
 
+
+
       // append the data for the stream
       tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
     } else {
diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c
index e3bde14b6d..a66d63a910 100644
--- a/source/dnode/vnode/src/tq/tqUtil.c
+++ b/source/dnode/vnode/src/tq/tqUtil.c
@@ -165,12 +165,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
                                                    SRpcMsg* pMsg, STqOffsetVal* pOffset) {
   uint64_t consumerId = pRequest->consumerId;
   int32_t  vgId = TD_VID(pTq->pVnode);
+  int      code = 0;
 
   SMqDataRsp dataRsp = {0};
   tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
+  qTaskInfo_t task = pHandle->execHandle.task;
+  if(qTaskIsExecuting(task)){
+    code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
+    tDeleteSMqDataRsp(&dataRsp);
+    return code;
+  }
 
   qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
-  int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
+  code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
   if(code != 0) {
     goto end;
   }
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index b7ec7bb5c5..c04a23d71f 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -4855,7 +4855,11 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
   qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
 
   if (pReader->flag == READER_STATUS_SUSPEND) {
-    tsdbReaderResume(pReader);
+    code = tsdbReaderResume(pReader);
+    if (code != TSDB_CODE_SUCCESS) {
+      tsdbReleaseReader(pReader);
+      return code;
+    }
   }
 
   if (pReader->innerReader[0] != NULL && pReader->step == 0) {
@@ -5133,11 +5137,17 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
 }
 
 int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
+  int32_t code = TSDB_CODE_SUCCESS;
+
   qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
   tsdbAcquireReader(pReader);
 
   if (pReader->flag == READER_STATUS_SUSPEND) {
-    tsdbReaderResume(pReader);
+    code = tsdbReaderResume(pReader);
+    if (code != TSDB_CODE_SUCCESS) {
+      tsdbReleaseReader(pReader);
+      return code;
+    }
   }
 
   if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
@@ -5172,8 +5182,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
   int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
   resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
 
-  int32_t code = 0;
-
   // no data in files, let's try buffer in memory
   if (pStatus->fileIter.numOfFiles == 0) {
     pStatus->loadFromFile = false;
@@ -5218,7 +5226,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
   // find the start data block in file
   tsdbAcquireReader(pReader);
   if (pReader->flag == READER_STATUS_SUSPEND) {
-    tsdbReaderResume(pReader);
+    code = tsdbReaderResume(pReader);
+    if (code != TSDB_CODE_SUCCESS) {
+      tsdbReleaseReader(pReader);
+      return code;
+    }
   }
   SReaderStatus* pStatus = &pReader->status;
 
@@ -5286,12 +5298,17 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
 }
 
 int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
+  int32_t code = TSDB_CODE_SUCCESS;
   int64_t rows = 0;
 
   SReaderStatus* pStatus = &pReader->status;
   tsdbAcquireReader(pReader);
   if (pReader->flag == READER_STATUS_SUSPEND) {
-    tsdbReaderResume(pReader);
+    code = tsdbReaderResume(pReader);
+    if (code != TSDB_CODE_SUCCESS) {
+      tsdbReleaseReader(pReader);
+      return code;
+    }
   }
 
   int32_t iter = 0;
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index 0006ce1f27..af318a6bc5 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -5572,7 +5572,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
   }
 
   int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
-                        "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]",
+                        "Total_Blocks=[%d] Total_Size=[%.2f KB] Average_size=[%.2f KB] Compression_Ratio=[%.2f %c]",
                         pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
 
   varDataSetLen(st, len);
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 5767df4973..d1bd47b884 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -6130,17 +6130,50 @@ static bool isEventWindowQuery(SSelectStmt* pSelect) {
   return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
 }
 
+static bool hasJsonTypeProjection(SSelectStmt* pSelect) {
+  SNode* pProj = NULL;
+  FOREACH(pProj, pSelect->pProjectionList) {
+    if (TSDB_DATA_TYPE_JSON == ((SExprNode*)pProj)->resType.type) {
+      return true;
+    }
+  }
+  return false;
+}
+
+static EDealRes hasColumnOrPseudoColumn(SNode* pNode, void* pContext) {
+  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
+    *(bool*)pContext = true;
+    return DEAL_RES_END;
+  }
+  if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)) {
+    *(bool*)pContext = true;
+    return DEAL_RES_END;
+  }
+  return DEAL_RES_CONTINUE;
+}
+
+static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) {
+  bool hasColumn = false;
+  nodesWalkExprPostOrder(pNode, hasColumnOrPseudoColumn, &hasColumn);
+  return hasColumn;
+}
+
 static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
   SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
   if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
       !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
-      crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) {
+      crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect) || hasJsonTypeProjection(pSelect)) {
     return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
   }
   if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
     return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
                                    "SUBTABLE expression must be of VARCHAR type");
   }
+  if (NULL != pSelect->pSubtable && 0 == LIST_LENGTH(pSelect->pPartitionByList) && subtableExprHasColumnOrPseudoColumn(pSelect->pSubtable)) {
+    return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
+                                   "SUBTABLE expression must not has column when no partition by clause");
+  }
+
   if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) {
     return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
                                    "The trigger mode of non window query can only be AT_ONCE");
@@ -8269,6 +8302,11 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
     return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN);
   }
 
+  if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
+      (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
+    return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
+  }
+
   if (TSDB_MAX_COLUMNS == pTableMeta->tableInfo.numOfColumns) {
     return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
   }
diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp
index b7ca944ebb..6a08193a39 100644
--- a/source/libs/parser/test/parInitialCTest.cpp
+++ b/source/libs/parser/test/parInitialCTest.cpp
@@ -920,6 +920,10 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
 
   run("CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)",
       TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC);
+  run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json('{c1:1}') FROM st1 PARTITION BY TBNAME",
+      TSDB_CODE_PAR_INVALID_STREAM_QUERY);
+  run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tbname)) "
+      "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)", TSDB_CODE_PAR_INVALID_STREAM_QUERY);
 }
 
 /*
diff --git a/source/libs/scalar/test/CMakeLists.txt b/source/libs/scalar/test/CMakeLists.txt
index 32f5e098c5..caaf86264c 100644
--- a/source/libs/scalar/test/CMakeLists.txt
+++ b/source/libs/scalar/test/CMakeLists.txt
@@ -1,4 +1,4 @@
 enable_testing()
 
-#add_subdirectory(filter)
+add_subdirectory(filter)
 add_subdirectory(scalar)
diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp
index b59e89fe0d..51ee9b6570 100644
--- a/source/libs/scalar/test/filter/filterTests.cpp
+++ b/source/libs/scalar/test/filter/filterTests.cpp
@@ -33,6 +33,7 @@
 #include "os.h"
 
 #include "filter.h"
+#include "filterInt.h"
 #include "nodes.h"
 #include "scalar.h"
 #include "stub.h"
@@ -344,6 +345,7 @@ TEST(timerangeTest, greater_and_lower_not_strict) {
   nodesDestroyNode(logicNode1);
 }
 
+#if 0
 TEST(columnTest, smallint_column_greater_double_value) {
   SNode       *pLeft = NULL, *pRight = NULL, *opNode = NULL;
   int16_t      leftv[5] = {1, 2, 3, 4, 5};
@@ -1337,6 +1339,127 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
   nodesDestroyNode(logicNode1);
   blockDataDestroy(src);
 }
+#endif
+
+template 
+int32_t compareSignedWithUnsigned(SignedT l, UnsignedT r) {
+  if (l < 0) return -1;
+  auto l_uint64 = static_cast(l);
+  auto r_uint64 = static_cast(r);
+  if (l_uint64 < r_uint64) return -1;
+  if (l_uint64 > r_uint64) return 1;
+  return 0;
+}
+
+template 
+int32_t compareUnsignedWithSigned(UnsignedT l, SignedT r) {
+  if (r < 0) return 1;
+  auto l_uint64 = static_cast(l);
+  auto r_uint64 = static_cast(r);
+  if (l_uint64 < r_uint64) return -1;
+  if (l_uint64 > r_uint64) return 1;
+  return 0;
+}
+
+template 
+void doCompareWithValueRange_SignedWithUnsigned(__compar_fn_t fp) {
+  int32_t signedMin = -10, signedMax = 10;
+  int32_t unsignedMin = 0, unsignedMax = 10;
+  for (SignedT l = signedMin; l <= signedMax; ++l) {
+    for (UnsignedT r = unsignedMin; r <= unsignedMax; ++r) {
+      ASSERT_EQ(fp(&l, &r), compareSignedWithUnsigned(l, r));
+    }
+  }
+}
+
+template 
+void doCompareWithValueRange_UnsignedWithSigned(__compar_fn_t fp) {
+  int32_t signedMin = -10, signedMax = 10;
+  int32_t unsignedMin = 0, unsignedMax = 10;
+  for (UnsignedT l = unsignedMin; l <= unsignedMax; ++l) {
+    for (SignedT r = signedMin; r <= signedMax; ++r) {
+      ASSERT_EQ(fp(&l, &r), compareUnsignedWithSigned(l, r));
+    }
+  }
+}
+
+template 
+void doCompareWithValueRange_OnlyLeftType(__compar_fn_t fp, int32_t rType) {
+  switch (rType) {
+    case TSDB_DATA_TYPE_UTINYINT:
+      doCompareWithValueRange_SignedWithUnsigned(fp);
+      break;
+    case TSDB_DATA_TYPE_USMALLINT:
+      doCompareWithValueRange_SignedWithUnsigned(fp);
+      break;
+    case TSDB_DATA_TYPE_UINT:
+      doCompareWithValueRange_SignedWithUnsigned(fp);
+      break;
+    case TSDB_DATA_TYPE_UBIGINT:
+      doCompareWithValueRange_SignedWithUnsigned(fp);
+      break;
+    case TSDB_DATA_TYPE_TINYINT:
+      doCompareWithValueRange_UnsignedWithSigned(fp);
+      break;
+    case TSDB_DATA_TYPE_SMALLINT:
+      doCompareWithValueRange_UnsignedWithSigned(fp);
+      break;
+    case TSDB_DATA_TYPE_INT:
+      doCompareWithValueRange_UnsignedWithSigned(fp);
+      break;
+    case TSDB_DATA_TYPE_BIGINT:
+      doCompareWithValueRange_UnsignedWithSigned(fp);
+      break;
+    default:
+      FAIL();
+  }
+}
+
+void doCompare(const std::vector &lTypes, const std::vector &rTypes, int32_t oper) {
+  for (int i = 0; i < lTypes.size(); ++i) {
+    for (int j = 0; j < rTypes.size(); ++j) {
+      auto fp = filterGetCompFuncEx(lTypes[i], rTypes[j], oper);
+      switch (lTypes[i]) {
+        case TSDB_DATA_TYPE_TINYINT:
+          doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+          break;
+        case TSDB_DATA_TYPE_SMALLINT:
+          doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+          break;
+        case TSDB_DATA_TYPE_INT:
+          doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+          break;
+        case TSDB_DATA_TYPE_BIGINT:
+          doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+          break;
+        case TSDB_DATA_TYPE_UTINYINT:
+          doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+          break;
+        case TSDB_DATA_TYPE_USMALLINT:
+          doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+          break;
+        case TSDB_DATA_TYPE_UINT:
+          doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+          break;
+        case TSDB_DATA_TYPE_UBIGINT:
+          doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+          break;
+        default:
+          FAIL();
+      }
+    }
+  }
+}
+
+TEST(dataCompareTest, signed_and_unsigned_int) {
+  std::vector lType = {TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_INT,
+                                TSDB_DATA_TYPE_BIGINT};
+  std::vector rType = {TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_UINT,
+                                TSDB_DATA_TYPE_UBIGINT};
+
+  doCompare(lType, rType, OP_TYPE_GREATER_THAN);
+  doCompare(rType, lType, OP_TYPE_GREATER_THAN);
+}
 
 int main(int argc, char **argv) {
   taosSeedRand(taosGetTimestampSec());
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 28a345c766..534f5f0597 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -189,6 +189,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
     return -1;
   }
 
+  if (streamMetaCommit(pMeta) < 0) {
+    tFreeStreamTask(pTask);
+    return -1;
+  }
+
   taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
   taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
   return 0;
diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c
index f9f14c2e00..3506d477d3 100644
--- a/source/libs/sync/src/syncRespMgr.c
+++ b/source/libs/sync/src/syncRespMgr.c
@@ -171,6 +171,8 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
 }
 
 void syncRespCleanRsp(SSyncRespMgr *pObj) {
+  if (pObj == NULL) return;
+
   SSyncNode *pNode = pObj->data;
   sTrace("vgId:%d, clean all resp", pNode->vgId);
 
diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c
index c23d6d0a1f..ea35f1cfe5 100644
--- a/source/libs/transport/src/transCli.c
+++ b/source/libs/transport/src/transCli.c
@@ -587,12 +587,12 @@ void* destroyConnPool(SCliThrd* pThrd) {
 
 static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
   void*      pool = pThrd->pool;
-  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
+  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
   STrans*    pTranInst = pThrd->pTransInst;
   if (plist == NULL) {
     SConnList list = {0};
-    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
-    plist = taosHashGet(pool, key, strlen(key));
+    taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
+    plist = taosHashGet(pool, key, strlen(key) + 1);
 
     SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
     QUEUE_INIT(&nList->msgQ);
@@ -627,11 +627,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
 static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
   void*      pool = pThrd->pool;
   STrans*    pTransInst = pThrd->pTransInst;
-  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
+  SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
   if (plist == NULL) {
     SConnList list = {0};
-    taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
-    plist = taosHashGet(pool, key, strlen(key));
+    taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
+    plist = taosHashGet(pool, key, strlen(key) + 1);
 
     SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
     QUEUE_INIT(&nList->msgQ);
@@ -717,7 +717,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
   cliDestroyConnMsgs(conn, false);
 
   if (conn->list == NULL) {
-    conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
+    conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1);
   }
 
   SConnList* pList = conn->list;
@@ -822,7 +822,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
     return;
   }
   if (nread < 0) {
-    tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
+    tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
+           T_REF_VAL_GET(conn));
     conn->broken = true;
     cliHandleExcept(conn);
   }
@@ -875,8 +876,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
     connList->list->numOfConn--;
     connList->size--;
   } else {
-    SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
-    connList->list->numOfConn--;
+    SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1);
+    if (connList != NULL) connList->list->numOfConn--;
   }
   conn->list = NULL;
   pThrd->newConnCount--;
@@ -1269,7 +1270,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
 
     if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
         (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
-      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
+      SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1);
       int64_t        cTimestamp = taosGetTimestampMs();
       if (item != NULL) {
         int32_t elapse = cTimestamp - item->timestamp;
@@ -1281,7 +1282,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
         }
       } else {
         SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
-        taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
+        taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem));
       }
     }
   } else {
@@ -1459,7 +1460,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
 }
 static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
   uint32_t  addr = 0;
-  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
+  uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
   if (v == NULL) {
     addr = taosGetIpv4FromFqdn(fqdn);
     if (addr == 0xffffffff) {
@@ -1468,7 +1469,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
       return addr;
     }
 
-    taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
+    taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
   } else {
     addr = *v;
   }
diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c
index 28fb474972..269c7ecf9b 100644
--- a/source/libs/transport/src/transSvr.c
+++ b/source/libs/transport/src/transSvr.c
@@ -314,7 +314,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
     return;
   }
 
-  tWarn("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
+  tDebug("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
   if (nread < 0) {
     conn->broken = true;
     if (conn->status == ConnAcquire) {
diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c
index f7ada8be84..206cb75963 100644
--- a/source/libs/wal/src/walRead.c
+++ b/source/libs/wal/src/walRead.c
@@ -37,7 +37,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
   if (cond) {
     pReader->cond = *cond;
   } else {
-    pReader->cond.scanUncommited = 0;
+//    pReader->cond.scanUncommited = 0;
     pReader->cond.scanNotApplied = 0;
     pReader->cond.scanMeta = 0;
     pReader->cond.enableRef = 0;
@@ -74,13 +74,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
   int64_t lastVer = walGetLastVer(pReader->pWal);
   int64_t committedVer = walGetCommittedVer(pReader->pWal);
   int64_t appliedVer = walGetAppliedVer(pReader->pWal);
-  int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
-  endVer = TMIN(appliedVer, endVer);
+  while(appliedVer < committedVer){   // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
+    wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer);
+    taosMsleep(1);
+    appliedVer = walGetAppliedVer(pReader->pWal);
+  }
+//  int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
+//  endVer = TMIN(appliedVer, endVer);
 
   wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
-         ", applied index:%" PRId64 ", end index:%" PRId64,
-         pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
-  while (fetchVer <= endVer) {
+         ", applied index:%" PRId64,
+         pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
+  while (fetchVer <= committedVer) {
     if (walFetchHeadNew(pReader, fetchVer) < 0) {
       return -1;
     }
diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c
index f8f78ae6a5..dc57ed97b2 100644
--- a/source/util/src/tcompare.c
+++ b/source/util/src/tcompare.c
@@ -308,17 +308,19 @@ int32_t compareInt8Uint16(const void *pLeft, const void *pRight) {
 
 int32_t compareInt8Uint32(const void *pLeft, const void *pRight) {
   int8_t   left = GET_INT8_VAL(pLeft);
+  if (left < 0) return -1;
   uint32_t right = GET_UINT32_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if ((uint32_t)left > right) return 1;
+  if ((uint32_t)left < right) return -1;
   return 0;
 }
 
 int32_t compareInt8Uint64(const void *pLeft, const void *pRight) {
   int8_t   left = GET_INT8_VAL(pLeft);
+  if (left < 0) return -1;
   uint64_t right = GET_UINT64_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if ((uint64_t)left > right) return 1;
+  if ((uint64_t)left < right) return -1;
   return 0;
 }
 
@@ -380,17 +382,19 @@ int32_t compareInt16Uint16(const void *pLeft, const void *pRight) {
 
 int32_t compareInt16Uint32(const void *pLeft, const void *pRight) {
   int16_t  left = GET_INT16_VAL(pLeft);
+  if (left < 0) return -1;
   uint32_t right = GET_UINT32_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if ((uint32_t)left > right) return 1;
+  if ((uint32_t)left < right) return -1;
   return 0;
 }
 
 int32_t compareInt16Uint64(const void *pLeft, const void *pRight) {
   int16_t  left = GET_INT16_VAL(pLeft);
+  if (left < 0) return -1;
   uint64_t right = GET_UINT64_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if ((uint64_t)left > right) return 1;
+  if ((uint64_t)left < right) return -1;
   return 0;
 }
 
@@ -452,17 +456,19 @@ int32_t compareInt32Uint16(const void *pLeft, const void *pRight) {
 
 int32_t compareInt32Uint32(const void *pLeft, const void *pRight) {
   int32_t  left = GET_INT32_VAL(pLeft);
+  if (left < 0) return -1;
   uint32_t right = GET_UINT32_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if ((uint32_t)left > right) return 1;
+  if ((uint32_t)left < right) return -1;
   return 0;
 }
 
 int32_t compareInt32Uint64(const void *pLeft, const void *pRight) {
   int32_t  left = GET_INT32_VAL(pLeft);
+  if (left < 0) return -1;
   uint64_t right = GET_UINT64_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if ((uint64_t)left > right) return 1;
+  if ((uint64_t)left < right) return -1;
   return 0;
 }
 
@@ -532,9 +538,10 @@ int32_t compareInt64Uint32(const void *pLeft, const void *pRight) {
 
 int32_t compareInt64Uint64(const void *pLeft, const void *pRight) {
   int64_t  left = GET_INT64_VAL(pLeft);
+  if (left < 0) return -1;
   uint64_t right = GET_UINT64_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if ((uint64_t)left > right) return 1;
+  if ((uint64_t)left < right) return -1;
   return 0;
 }
 
@@ -857,24 +864,27 @@ int32_t compareUint16Uint64(const void *pLeft, const void *pRight) {
 int32_t compareUint32Int8(const void *pLeft, const void *pRight) {
   uint32_t left = GET_UINT32_VAL(pLeft);
   int8_t   right = GET_INT8_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if (right < 0) return 1;
+  if (left > (uint32_t)right) return 1;
+  if (left < (uint32_t)right) return -1;
   return 0;
 }
 
 int32_t compareUint32Int16(const void *pLeft, const void *pRight) {
   uint32_t left = GET_UINT32_VAL(pLeft);
   int16_t  right = GET_INT16_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if (right < 0) return 1;
+  if (left > (uint32_t)right) return 1;
+  if (left < (uint32_t)right) return -1;
   return 0;
 }
 
 int32_t compareUint32Int32(const void *pLeft, const void *pRight) {
   uint32_t left = GET_UINT32_VAL(pLeft);
   int32_t  right = GET_INT32_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if (right < 0) return 1;
+  if (left > (uint32_t)right) return 1;
+  if (left < (uint32_t)right) return -1;
   return 0;
 }
 
@@ -929,32 +939,36 @@ int32_t compareUint32Uint64(const void *pLeft, const void *pRight) {
 int32_t compareUint64Int8(const void *pLeft, const void *pRight) {
   uint64_t left = GET_UINT64_VAL(pLeft);
   int8_t   right = GET_INT8_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if (right < 0) return 1;
+  if (left > (uint64_t)right) return 1;
+  if (left < (uint64_t)right) return -1;
   return 0;
 }
 
 int32_t compareUint64Int16(const void *pLeft, const void *pRight) {
   uint64_t left = GET_UINT64_VAL(pLeft);
   int16_t  right = GET_INT16_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if (right < 0) return 1;
+  if (left > (uint64_t)right) return 1;
+  if (left < (uint64_t)right) return -1;
   return 0;
 }
 
 int32_t compareUint64Int32(const void *pLeft, const void *pRight) {
   uint64_t left = GET_UINT64_VAL(pLeft);
   int32_t  right = GET_INT32_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if (right < 0) return 1;
+  if (left > (uint64_t)right) return 1;
+  if (left < (uint64_t)right) return -1;
   return 0;
 }
 
 int32_t compareUint64Int64(const void *pLeft, const void *pRight) {
   uint64_t left = GET_UINT64_VAL(pLeft);
   int64_t  right = GET_INT64_VAL(pRight);
-  if (left > right) return 1;
-  if (left < right) return -1;
+  if (right < 0) return 1;
+  if (left > (uint64_t)right) return 1;
+  if (left < (uint64_t)right) return -1;
   return 0;
 }
 
diff --git a/tests/pytest/auto_crash_gen.py b/tests/pytest/auto_crash_gen.py
index 5af2f055cd..00e1786399 100755
--- a/tests/pytest/auto_crash_gen.py
+++ b/tests/pytest/auto_crash_gen.py
@@ -342,12 +342,29 @@ def main():
         print('======== crash_gen run sucess and exit as expected ========')
 
     try:
-        text = f'''exit status: {msg_dict[status]} 
-                   git commit :  {git_commit}
-                   hostname: {hostname}
-                   start time: {starttime}
-                   end time: {endtime}
-                   cmd: {crash_cmds}'''
+        cmd = crash_cmds.split('&')[2]
+        if status == 0:
+            log_dir = "none"            
+        else:
+            log_dir= "/root/pxiao/crash_gen_logs" 
+        
+        if status == 3:
+            core_dir = "/root/pxiao/crash_gen_logs"
+        else:
+            core_dir = "none"
+            
+        text = f'''
+        exit status: {msg_dict[status]}
+        test scope: crash_gen
+        owner: pxiao
+        hostname: {hostname}
+        start time: {starttime}
+        end time: {endtime}
+        git commit :  {git_commit}
+        log dir: {log_dir}
+        core dir: {core_dir}
+        cmd: {cmd}'''
+                
         send_msg(get_msg(text))  
     except Exception as e:
         print("exception:", e)
diff --git a/tests/pytest/auto_crash_gen_valgrind.py b/tests/pytest/auto_crash_gen_valgrind.py
index 49e2c43f84..e37cda0a27 100755
--- a/tests/pytest/auto_crash_gen_valgrind.py
+++ b/tests/pytest/auto_crash_gen_valgrind.py
@@ -377,13 +377,30 @@ def main():
         print('======== crash_gen run sucess and exit as expected ========')
 
     try:
-        text = f'''exit status: {msg_dict[status]} 
-                   git commit :  {git_commit}
-                   hostname: {hostname}
-                   start time: {starttime}
-                   end time: {endtime}
-                   cmd: {crash_cmds}'''
-        send_msg(get_msg(text))  
+        cmd = crash_cmds.split('&')[2]
+        if status == 0:
+            log_dir = "none"            
+        else:
+            log_dir= "/root/pxiao/crash_gen_logs" 
+        
+        if status == 3:
+            core_dir = "/root/pxiao/crash_gen_logs"
+        else:
+            core_dir = "none"
+            
+        text = f'''
+        exit status: {msg_dict[status]}
+        test scope: crash_gen
+        owner: pxiao
+        hostname: {hostname}
+        start time: {starttime}
+        end time: {endtime}
+        git commit :  {git_commit}
+        log dir: {log_dir}
+        core dir: {core_dir}
+        cmd: {cmd}'''
+        
+        send_msg(get_msg(text))
     except Exception as e:
         print("exception:", e)
     exit(status)
diff --git a/tests/pytest/auto_crash_gen_valgrind_cluster.py b/tests/pytest/auto_crash_gen_valgrind_cluster.py
index 5189ff4262..af19836a83 100755
--- a/tests/pytest/auto_crash_gen_valgrind_cluster.py
+++ b/tests/pytest/auto_crash_gen_valgrind_cluster.py
@@ -377,12 +377,29 @@ def main():
         print('======== crash_gen run sucess and exit as expected ========')
 
     try:
-        text = f'''exit status: {msg_dict[status]} 
-                   git commit :  {git_commit}
-                   hostname: {hostname}
-                   start time: {starttime}
-                   end time: {endtime}
-                   cmd: {crash_cmds}'''
+        cmd = crash_cmds.split('&')[2]
+        if status == 0:
+            log_dir = "none"            
+        else:
+            log_dir= "/root/pxiao/crash_gen_logs" 
+        
+        if status == 3:
+            core_dir = "/root/pxiao/crash_gen_logs"
+        else:
+            core_dir = "none"
+            
+        text = f'''
+        exit status: {msg_dict[status]}
+        test scope: crash_gen
+        owner: pxiao
+        hostname: {hostname}
+        start time: {starttime}
+        end time: {endtime}
+        git commit :  {git_commit}
+        log dir: {log_dir}
+        core dir: {core_dir}
+        cmd: {cmd}'''
+                
         send_msg(get_msg(text))  
     except Exception as e:
         print("exception:", e)
diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim
index dccfc7f5d6..ded5d6f78a 100644
--- a/tests/script/tsim/alter/table.sim
+++ b/tests/script/tsim/alter/table.sim
@@ -657,6 +657,17 @@ if $data20 != null then
   return -1
 endi
 
+print =============== error 
+sql create table tb2023(ts timestamp, f int);
+sql_error alter table tb2023 add column v varchar(16375);
+sql_error alter table tb2023 add column v varchar(16385);
+sql_error alter table tb2023 add column v varchar(33100);
+sql alter table tb2023 add column v varchar(16374);
+sql desc tb2023
+sql alter table tb2023 drop column v
+sql_error alter table tb2023 add column v nchar(4094);
+sql alter table tb2023 add column v nchar(4093);
+sql desc tb2023
 print ======= over
 sql drop database d1
 sql select * from information_schema.ins_databases
diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file
index c3fb57c49b..85829ca7a7 100644
--- a/tests/system-test/win-test-file
+++ b/tests/system-test/win-test-file
@@ -279,7 +279,7 @@ python3 ./test.py -f 7-tmq/subscribeDb1.py
 python3 ./test.py -f 7-tmq/subscribeDb2.py
 python3 ./test.py -f 7-tmq/subscribeDb3.py
 python3 ./test.py -f 7-tmq/subscribeDb4.py
-#python3 ./test.py -f 7-tmq/subscribeStb.py
+python3 ./test.py -f 7-tmq/subscribeStb.py
 python3 ./test.py -f 7-tmq/subscribeStb0.py
 python3 ./test.py -f 7-tmq/subscribeStb1.py
 python3 ./test.py -f 7-tmq/subscribeStb2.py