diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in
index 87f0579f44..897ccdd158 100644
--- a/cmake/taostools_CMakeLists.txt.in
+++ b/cmake/taostools_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
- GIT_TAG d11f210
+ GIT_TAG 04296a5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/examples/JDBC/taosdemo/pom.xml b/examples/JDBC/taosdemo/pom.xml
index 68224bbad5..4731d8e237 100644
--- a/examples/JDBC/taosdemo/pom.xml
+++ b/examples/JDBC/taosdemo/pom.xml
@@ -10,7 +10,7 @@
Demo project for TDengine
- 5.3.20
+ 5.3.26
diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h
index 7b65c06b85..60a4a8c605 100644
--- a/include/libs/function/functionMgt.h
+++ b/include/libs/function/functionMgt.h
@@ -241,6 +241,7 @@ int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
+char* fmGetFuncName(int32_t funcId);
#ifdef __cplusplus
}
diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c
index 6bd8b01842..b1299c16e8 100644
--- a/source/client/src/clientRawBlockWrite.c
+++ b/source/client/src/clientRawBlockWrite.c
@@ -183,7 +183,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
}
string = cJSON_PrintUnformatted(json);
-end:
+ end:
cJSON_Delete(json);
tFreeSMAltertbReq(&req);
return string;
@@ -205,7 +205,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
}
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
uDebug("processCreateStb %s", string);
-_err:
+ _err:
tDecoderClear(&coder);
return string;
}
@@ -227,7 +227,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
uDebug("processAlterStb %s", string);
-_err:
+ _err:
tDecoderClear(&coder);
return string;
}
@@ -309,7 +309,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
cJSON_AddItemToArray(tags, tag);
}
-end:
+ end:
cJSON_AddItemToObject(json, "tags", tags);
taosArrayDestroy(pTagVals);
}
@@ -368,7 +368,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
uDebug("processCreateTable :%s", string);
}
-_exit:
+ _exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
@@ -408,7 +408,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
}
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
uDebug("processAutoCreateTable :%s", string);
-_exit:
+ _exit:
for (int i = 0; i < rsp->createTableNum; i++) {
tDecoderClear(&decoder[i]);
taosMemoryFreeClear(pCreateReq[i].comment);
@@ -535,7 +535,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json);
uDebug("processAlterTable :%s", string);
-_exit:
+ _exit:
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
@@ -569,7 +569,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json);
uDebug("processDropSTable :%s", string);
-_exit:
+ _exit:
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
@@ -609,7 +609,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json);
uDebug("processDeleteTable :%s", string);
-_exit:
+ _exit:
cJSON_Delete(json);
tDecoderClear(&coder);
return string;
@@ -652,7 +652,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json);
uDebug("processDropTable :%s", string);
-_exit:
+ _exit:
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
@@ -742,7 +742,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
-end:
+ end:
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder);
@@ -839,7 +839,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
-end:
+ end:
destroyRequest(pRequest);
tDecoderClear(&coder);
return code;
@@ -901,9 +901,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
- .requestId = pRequest->requestId,
- .requestObjRefId = pRequest->self,
- .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
+ .requestId = pRequest->requestId,
+ .requestObjRefId = pRequest->self,
+ .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table
@@ -987,7 +987,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
-end:
+ end:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
@@ -1058,9 +1058,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
- .requestId = pRequest->requestId,
- .requestObjRefId = pRequest->self,
- .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
+ .requestId = pRequest->requestId,
+ .requestObjRefId = pRequest->self,
+ .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
@@ -1132,7 +1132,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
}
code = pRequest->code;
-end:
+ end:
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
@@ -1201,7 +1201,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
}
taos_free_result(res);
-end:
+ end:
tDecoderClear(&coder);
return code;
}
@@ -1249,9 +1249,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
}
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
- .requestId = pRequest->requestId,
- .requestObjRefId = pRequest->self,
- .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
+ .requestId = pRequest->requestId,
+ .requestObjRefId = pRequest->self,
+ .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0};
SName pName = {0};
@@ -1311,7 +1311,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
code = handleAlterTbExecRes(pRes->res, pCatalog);
}
}
-end:
+ end:
taosArrayDestroy(pArray);
if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData);
@@ -1399,7 +1399,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
-end:
+ end:
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
@@ -1481,7 +1481,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
-end:
+ end:
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
@@ -1601,6 +1601,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
}
+ taosMemoryFreeClear(pTableMeta);
}
code = smlBuildOutput(pQuery, pVgHash);
@@ -1612,7 +1613,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
-end:
+ end:
tDeleteSMqDataRsp(&rspObj.rsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
@@ -1707,6 +1708,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp);
+ tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
uError("WriteRaw: tDecodeSVCreateTbReq error");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
@@ -1715,15 +1717,19 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
if (pCreateReq.type != TSDB_CHILD_TABLE) {
uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName);
code = TSDB_CODE_TSC_INVALID_VALUE;
+ tDecoderClear(&decoderTmp);
+ tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
goto end;
}
if (strcmp(tbName, pCreateReq.name) == 0) {
cloneSVreateTbReq(&pCreateReq, &pCreateReqDst);
// pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb);
tDecoderClear(&decoderTmp);
+ tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
break;
}
tDecoderClear(&decoderTmp);
+ tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
}
SVgroupInfo vg;
@@ -1774,6 +1780,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
pCreateReqDst = NULL;
+ taosMemoryFreeClear(pTableMeta);
}
code = smlBuildOutput(pQuery, pVgHash);
@@ -1785,7 +1792,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
-end:
+ end:
tDeleteSTaosxRsp(&rspObj.rsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c
index 77bbd0be1a..111ca28cdc 100644
--- a/source/client/src/clientTmq.c
+++ b/source/client/src/clientTmq.c
@@ -1116,6 +1116,7 @@ _failed:
}
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
+ const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
void* buf = NULL;
@@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
- if (retryCnt++ > 40) {
+ if (retryCnt++ > MAX_RETRY_COUNT) {
goto FAIL;
}
@@ -1811,7 +1812,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pRspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&pRspWrapper);
-
if (pRspWrapper == NULL) {
return NULL;
}
@@ -1831,7 +1831,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) {
- // todo fix it: race condition
SMqClientVg* pVg = pollRspWrapper->vgHandle;
// update the epset
@@ -1843,6 +1842,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset;
}
+ // update the local offset value only for the returned values.
pVg->currentOffset = pDataRsp->rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c
index 2bb8708372..34808aa389 100644
--- a/source/common/src/tdataformat.c
+++ b/source/common/src/tdataformat.c
@@ -2439,6 +2439,12 @@ _exit:
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data) {
int32_t code = 0;
+ if(data == NULL){
+ for (int32_t i = 0; i < nRows; ++i) {
+ code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
+ }
+ goto _exit;
+ }
if (IS_VAR_DATA_TYPE(type)) { // var-length data type
for (int32_t i = 0; i < nRows; ++i) {
diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h
index c2b38f5cd1..9037644602 100644
--- a/source/dnode/vnode/src/inc/tq.h
+++ b/source/dnode/vnode/src/inc/tq.h
@@ -109,23 +109,18 @@ typedef struct {
} STqPushEntry;
struct STQ {
- SVnode* pVnode;
- char* path;
- int64_t walLogLastVer;
-
- SRWLatch pushLock;
-
- SHashObj* pPushMgr; // consumerId -> STqPushEntry
- SHashObj* pHandle; // subKey -> STqHandle
- SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
-
+ SVnode* pVnode;
+ char* path;
+ int64_t walLogLastVer;
+ SRWLatch lock;
+ SHashObj* pPushMgr; // consumerId -> STqPushEntry
+ SHashObj* pHandle; // subKey -> STqHandle
+ SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore;
-
- TDB* pMetaDB;
- TTB* pExecStore;
- TTB* pCheckStore;
-
- SStreamMeta* pStreamMeta;
+ TDB* pMetaDB;
+ TTB* pExecStore;
+ TTB* pCheckStore;
+ SStreamMeta* pStreamMeta;
};
typedef struct {
@@ -144,8 +139,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
// tqExec
-int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
-// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
+int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
@@ -164,7 +158,7 @@ typedef struct {
int32_t size;
} STqOffsetHead;
-STqOffsetStore* tqOffsetOpen();
+STqOffsetStore* tqOffsetOpen(STQ* pTq);
void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h
index 0b38ce6d24..452b1f6c0b 100644
--- a/source/dnode/vnode/src/inc/tsdb.h
+++ b/source/dnode/vnode/src/inc/tsdb.h
@@ -687,6 +687,8 @@ typedef struct SSttBlockLoadInfo {
STSchema *pSchema;
int16_t *colIds;
int32_t numOfCols;
+ bool checkRemainingRow;
+ bool isLast;
bool sttBlockLoaded;
int32_t numOfStt;
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index a58c9530ba..bf97f893f8 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -51,7 +51,7 @@ void tqCleanUp() {
}
}
-static void destroySTqHandle(void* data) {
+static void destroyTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
@@ -89,9 +89,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
- taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
+ taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
- taosInitRWLatch(&pTq->pushLock);
+ taosInitRWLatch(&pTq->lock);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
@@ -373,7 +373,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char formatBuf[80];
tFormatOffset(formatBuf, 80, pOffsetVal);
- tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
+ tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
consumerId, pHandle->subKey, vgId, formatBuf);
return 0;
} else {
@@ -431,20 +431,24 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
-static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
- int32_t vgId = TD_VID(pTq->pVnode);
+#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
+
+static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
+ SRpcMsg* pMsg, STqOffsetVal* pOffset) {
+ uint64_t consumerId = pRequest->consumerId;
+ int32_t vgId = TD_VID(pTq->pVnode);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
// lock
- taosWLockLatch(&pTq->pushLock);
- qSetTaskId(pHandle->execHandle.task, pRequest->consumerId, pRequest->reqId);
+ taosWLockLatch(&pTq->lock);
- int code = tqScanData(pTq, pHandle, &dataRsp, offset);
+ qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
+ int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) {
tDeleteSMqDataRsp(&dataRsp);
- taosWUnLockLatch(&pTq->pushLock);
+ taosWUnLockLatch(&pTq->lock);
return TSDB_CODE_TMQ_CONSUMER_ERROR;
}
@@ -452,24 +456,25 @@ static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
- taosWUnLockLatch(&pTq->pushLock);
+ taosWUnLockLatch(&pTq->lock);
return code;
}
- taosWUnLockLatch(&pTq->pushLock);
+ taosWUnLockLatch(&pTq->lock);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
- ", ts:%" PRId64,
- pRequest->consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
- dataRsp.rspOffset.ts);
+ ", ts:%" PRId64 ", reqId:0x%" PRIx64,
+ consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
+ dataRsp.rspOffset.ts, pRequest->reqId);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
-static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
+
+static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
int code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SWalCkHead* pCkHead = NULL;
@@ -485,17 +490,14 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
- pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
- metaRsp.rspOffset.ts);
+ pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
- ",ts:%" PRId64,
- pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
- taosxRsp.rspOffset.ts);
+ ",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
@@ -505,7 +507,8 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
}
}
- if (offset->type == TMQ_OFFSET__LOG){
+
+ if (offset->type == TMQ_OFFSET__LOG) {
int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
@@ -513,15 +516,13 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
-
walSetReaderCapacity(pHandle->pWalReader, 2048);
-
+ int totalRows = 0;
while (1) {
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
- ", found new consumer epoch %d, discard req epoch %d",
- pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
+ ", found new consumer epoch %d, discard req epoch %d", pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
break;
}
@@ -537,30 +538,16 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", pRequest->consumerId,
pRequest->epoch, vgId, fetchVer, pHead->msgType);
- if (pHead->msgType == TDMT_VND_SUBMIT) {
- SPackedData submit = {
- .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
- .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
- .ver = pHead->version,
- };
-
- if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
- tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
- pRequest->subKey);
- return -1;
- }
-
- if (taosxRsp.blockNum > 0) {
- tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
+ // process meta
+ if (pHead->msgType != TDMT_VND_SUBMIT) {
+ if(totalRows > 0) {
+ tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
- } else {
- fetchVer++;
}
- } else {
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType;
@@ -577,6 +564,31 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
+
+ // process data
+ SPackedData submit = {
+ .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
+ .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
+ .ver = pHead->version,
+ };
+
+ if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
+ tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
+ pRequest->subKey);
+ taosMemoryFreeClear(pCkHead);
+ tDeleteSTaosxRsp(&taosxRsp);
+ return -1;
+ }
+
+ if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
+ tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
+ code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
+ tDeleteSTaosxRsp(&taosxRsp);
+ taosMemoryFreeClear(pCkHead);
+ return code;
+ } else {
+ fetchVer++;
+ }
}
}
@@ -585,15 +597,14 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
return 0;
}
-static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
+static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1;
STqOffsetVal offset = {0};
STqOffsetVal reqOffset = pRequest->reqOffset;
// 1. reset the offset if needed
- if (reqOffset.type > 0) {
- offset = reqOffset;
- } else { // handle the reset offset cases, according to the consumer's choice.
+ if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
+ // handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) {
@@ -604,16 +615,19 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (blockReturned) {
return 0;
}
+ } else { // use the consumer specified offset
+ // the offset value can not be monotonious increase??
+ offset = reqOffset;
}
- // this is a normal subscription requirement
+ // this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
- return processSubColumn(pTq, pHandle, pRequest, pMsg, &offset);
+ return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
}
// todo handle the case where re-balance occurs.
// for taosx
- return processSubDbOrTable(pTq, pHandle, pRequest, pMsg, &offset);
+ return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
@@ -638,31 +652,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
// 2. check re-balance status
- taosRLockLatch(&pTq->pushLock);
+ taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
- taosRUnLockLatch(&pTq->pushLock);
+ taosRUnLockLatch(&pTq->lock);
return -1;
}
- taosRUnLockLatch(&pTq->pushLock);
+ taosRUnLockLatch(&pTq->lock);
- taosWLockLatch(&pTq->pushLock);
// 3. update the epoch value
+ taosWLockLatch(&pTq->lock);
int32_t savedEpoch = pHandle->epoch;
if (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
pHandle->epoch = reqEpoch;
}
- taosWUnLockLatch(&pTq->pushLock);
+ taosWUnLockLatch(&pTq->lock);
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
- return extractDataForMq(pTq, pHandle, &req, pMsg);
+ return doPollDataForMq(pTq, pHandle, &req, pMsg);
}
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
@@ -670,12 +684,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
- taosWLockLatch(&pTq->pushLock);
+ taosWLockLatch(&pTq->lock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
}
- taosWUnLockLatch(&pTq->pushLock);
+ taosWUnLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) {
@@ -739,18 +753,18 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SVnode* pVnode = pTq->pVnode;
int32_t vgId = TD_VID(pVnode);
- tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
+ tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
- tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "",
+ tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
}
if (req.newConsumerId == -1) {
- tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
+ tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
taosMemoryFree(req.qmsg);
return 0;
}
@@ -840,28 +854,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg);
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
- }
+ } else {
+ tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
+ req.newConsumerId);
- tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
- req.newConsumerId);
+ taosWLockLatch(&pTq->lock);
+ atomic_store_32(&pHandle->epoch, -1);
- taosWLockLatch(&pTq->pushLock);
- atomic_store_32(&pHandle->epoch, -1);
+ // remove if it has been register in the push manager, and return one empty block to consumer
+ tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
- // remove if it has been register in the push manager, and return one empty block to consumer
- tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true);
+ atomic_store_64(&pHandle->consumerId, req.newConsumerId);
+ atomic_add_fetch_32(&pHandle->epoch, 1);
- atomic_store_64(&pHandle->consumerId, req.newConsumerId);
- atomic_add_fetch_32(&pHandle->epoch, 1);
+ if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
+ qStreamCloseTsdbReader(pHandle->execHandle.task);
+ }
- if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
- qStreamCloseTsdbReader(pHandle->execHandle.task);
- }
-
- taosWUnLockLatch(&pTq->pushLock);
- if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
- taosMemoryFree(req.qmsg);
- return -1;
+ taosWUnLockLatch(&pTq->lock);
+ if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
+ taosMemoryFree(req.qmsg);
+ return -1;
+ }
}
}
diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c
index 0dbec85e6e..87c762dd03 100644
--- a/source/dnode/vnode/src/tq/tqExec.c
+++ b/source/dnode/vnode/src/tq/tqExec.c
@@ -183,10 +183,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return 0;
}
-int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp) {
+int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) {
STqExecHandle* pExec = &pHandle->execHandle;
- /*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/
-
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
@@ -201,7 +199,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
- /*int64_t uid = pExec->pExecReader->msgIter.uid;*/
int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
@@ -243,6 +240,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
+ totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
@@ -251,13 +249,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader;
- /*tqReaderSetDataMsg(pReader, pReq, 0);*/
tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) {
- /*SSDataBlock block = {0};*/
- /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
- /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
- /*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL;
@@ -302,15 +295,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder);
}
- /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
- /*pTq->pVnode->config.tsdbCfg.precision);*/
- /*blockDataFreeRes(&block);*/
- /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
- /*pRsp->blockNum++;*/
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
+ *totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW);
@@ -320,9 +309,5 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
-// if (pRsp->blockNum == 0) {
-// return -1;
-// }
-
return 0;
}
diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c
index 01d8e7cf14..797aeb3f04 100644
--- a/source/dnode/vnode/src/tq/tqPush.c
+++ b/source/dnode/vnode/src/tq/tqPush.c
@@ -213,7 +213,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
- taosWLockLatch(&pTq->pushLock);
+ taosWLockLatch(&pTq->lock);
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) {
@@ -231,7 +231,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosArrayDestroy(cachedKeyLens);
// unlock
- taosWUnLockLatch(&pTq->pushLock);
+ taosWUnLockLatch(&pTq->lock);
return -1;
}
@@ -320,7 +320,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosMemoryFree(data);
}
// unlock
- taosWUnLockLatch(&pTq->pushLock);
+ taosWUnLockLatch(&pTq->lock);
}
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c
index 3d01184e78..6fc8ad8be6 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCache.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCache.c
@@ -590,6 +590,7 @@ typedef struct {
SDataFReader **pDataFReader;
TSDBROW row;
+ bool checkRemainingRow;
SMergeTree mergeTree;
SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo;
@@ -600,7 +601,6 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
int nCols) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0;
- bool checkRemainingRow = true;
switch (state->state) {
case SFSLASTNEXTROW_FS:
@@ -633,12 +633,25 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
if (code) goto _err;
}
- state->pLoadInfo->colIds = aCols;
- state->pLoadInfo->numOfCols = nCols;
+ for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
+ state->pLoadInfo[i].colIds = aCols;
+ state->pLoadInfo[i].numOfCols = nCols;
+ state->pLoadInfo[i].isLast = isLast;
+ }
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
state->pMergeTree = &state->mergeTree;
+ state->state = SFSLASTNEXTROW_BLOCKROW;
+ }
+ case SFSLASTNEXTROW_BLOCKROW: {
+ if (nCols != state->pLoadInfo->numOfCols) {
+ for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
+ state->pLoadInfo[i].numOfCols = nCols;
+
+ state->pLoadInfo[i].checkRemainingRow = state->checkRemainingRow;
+ }
+ }
bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) {
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
@@ -649,76 +662,23 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
state->state = SFSLASTNEXTROW_FILESET;
goto _next_fileset;
}
- state->state = SFSLASTNEXTROW_BLOCKROW;
- checkRemainingRow = false;
- }
- case SFSLASTNEXTROW_BLOCKROW: {
- bool skipRow = false;
- do {
- bool hasVal = false;
- state->row = tMergeTreeGetRow(&state->mergeTree);
- *ppRow = &state->row;
- if (nCols != state->pLoadInfo->numOfCols) {
- state->pLoadInfo->numOfCols = nCols;
- }
- hasVal = tMergeTreeNext(&state->mergeTree);
- if (TSDBROW_TS(&state->row) <= state->lastTs) {
- *pIgnoreEarlierTs = true;
- *ppRow = NULL;
- return code;
- }
+ state->row = tMergeTreeGetRow(&state->mergeTree);
+ *ppRow = &state->row;
- *pIgnoreEarlierTs = false;
- if (!hasVal) {
- state->state = SFSLASTNEXTROW_FILESET;
- break;
- }
+ if (TSDBROW_TS(&state->row) <= state->lastTs) {
+ *pIgnoreEarlierTs = true;
+ *ppRow = NULL;
+ return code;
+ }
- if (checkRemainingRow) {
- bool skipBlock = true;
-
- SBlockData *pBlockData = state->row.pBlockData;
-
- for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
- for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
- SColData *pColData = &pBlockData->aColData[colIndex];
- int16_t cid = pColData->cid;
-
- if (cid == aCols[inputColIndex]) {
- if (isLast && (pColData->flag & HAS_VALUE)) {
- skipBlock = false;
- break;
- } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
- skipBlock = false;
- break;
- }
- }
- }
- }
- /*
- for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
- SColData *pColData = &pBlockData->aColData[colIndex];
- int16_t cid = pColData->cid;
-
- if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
- if (isLast && (pColData->flag & HAS_VALUE)) {
- skipBlock = false;
- break;
- } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
- skipBlock = false;
- break;
- }
-
- ++inputColIndex;
- }
- }
- */
- if (skipBlock) {
- skipRow = true;
- }
- }
- } while (skipRow);
+ *pIgnoreEarlierTs = false;
+ if (!hasVal) {
+ state->state = SFSLASTNEXTROW_FILESET;
+ }
+ if (!state->checkRemainingRow) {
+ state->checkRemainingRow = true;
+ }
return code;
}
default:
diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
index 943b16116c..fa8870835c 100644
--- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
@@ -504,9 +504,34 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter->iRow += step;
while (1) {
+ bool skipBlock = false;
+
findNextValidRow(pIter, idStr);
- if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
+ if (pIter->pBlockLoadInfo->checkRemainingRow) {
+ skipBlock = true;
+ int16_t *aCols = pIter->pBlockLoadInfo->colIds;
+ int nCols = pIter->pBlockLoadInfo->numOfCols;
+ bool isLast = pIter->pBlockLoadInfo->isLast;
+ for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
+ for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
+ SColData *pColData = &pBlockData->aColData[colIndex];
+ int16_t cid = pColData->cid;
+
+ if (cid == aCols[inputColIndex]) {
+ if (isLast && (pColData->flag & HAS_VALUE)) {
+ skipBlock = false;
+ break;
+ } else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
+ skipBlock = false;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
tLDataIterNextBlock(pIter, idStr);
if (pIter->pSttBlk == NULL) { // no more data
goto _exit;
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index 5ad9276c6c..96bce02b67 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -4507,6 +4507,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
int32_t i = 0, j = 0;
int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
+ size++;
while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
@@ -4519,10 +4520,21 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
+ i += 1;
+ size++;
}
j += 1;
}
}
+
+ while (j < numOfCols) {
+ if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
+ SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
+ taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
+ i += 1;
+ }
+ j++;
+ }
}
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
@@ -4602,8 +4614,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
} else if (pAgg->colId < pSup->colId[j]) {
i += 1;
} else if (pSup->colId[j] < pAgg->colId) {
- // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
- pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg;
+ pResBlock->pBlockAgg[pSup->slotId[j]] = NULL;
+ *allHave = false;
j += 1;
}
}
@@ -4996,4 +5008,4 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
taosMemoryFreeClear(pReader->idStr);
pReader->idStr = taosStrdup(idstr);
-}
\ No newline at end of file
+}
diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c
index 71458acce2..052e4ab2c1 100644
--- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c
+++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c
@@ -335,6 +335,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
// commit json
if (!rollback) {
+ pWriter->info.state.committed = pWriter->ever;
pVnode->config = pWriter->info.config;
pVnode->state = (SVState){.committed = pWriter->info.state.committed,
.applied = pWriter->info.state.committed,
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 84602917f2..3d56e6c78a 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -559,6 +559,7 @@ typedef struct SStreamIntervalOperatorInfo {
STimeWindowAggSupp twAggSup;
bool invertible;
bool ignoreExpiredData;
+ bool ignoreExpiredDataSaved;
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
@@ -620,6 +621,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
+ bool ignoreExpiredDataSaved;
SArray* pUpdated;
SSHashObj* pStUpdated;
} SStreamSessionAggOperatorInfo;
@@ -637,6 +639,7 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
+ bool ignoreExpiredDataSaved;
SArray* pUpdated;
SSHashObj* pSeUpdated;
} SStreamStateAggOperatorInfo;
diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h
index fad4059515..726f0df1e8 100644
--- a/source/libs/executor/inc/tfill.h
+++ b/source/libs/executor/inc/tfill.h
@@ -92,8 +92,8 @@ typedef struct SResultRowData {
typedef struct SStreamFillLinearInfo {
TSKEY nextEnd;
- SArray* pDeltaVal; // double. value for Fill(linear).
- SArray* pNextDeltaVal; // double. value for Fill(linear).
+ SArray* pEndPoints;
+ SArray* pNextEndPoints;
int64_t winIndex;
bool hasNext;
} SStreamFillLinearInfo;
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index dce358ab6d..953d614951 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -2042,7 +2042,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (!pBlock || pBlock->info.rows == 0) {
- qDebug("===stream===printDataBlock: Block is Null or Empty");
+ qDebug("===stream===%s: Block is Null or Empty", flag);
return;
}
char* pBuf = NULL;
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 75d05feb67..b049fcb84b 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -888,7 +888,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
-
+ pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
+ pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
@@ -904,6 +905,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
+ pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
+ pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
@@ -917,6 +920,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
+ pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
+ pInfo->ignoreExpiredData = false;
}
// iterate operator tree
@@ -944,35 +949,23 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
- /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
- /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
-
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
- /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
- /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
+ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
- /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
- /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
-
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
- /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
- /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
+ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
- /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
- /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
-
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
- /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
- /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
+ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
}
diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c
index 2a33e3527a..234f1a666c 100644
--- a/source/libs/executor/src/filloperator.c
+++ b/source/libs/executor/src/filloperator.c
@@ -447,9 +447,14 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
return NULL;
}
+void destroySPoint(void* ptr) {
+ SPoint* point = (SPoint*) ptr;
+ taosMemoryFreeClear(point->val);
+}
+
void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
- taosArrayDestroy(pFillLinear->pDeltaVal);
- taosArrayDestroy(pFillLinear->pNextDeltaVal);
+ taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
+ taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
taosMemoryFree(pFillLinear);
return NULL;
}
@@ -611,19 +616,15 @@ static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pR
}
}
-static void calcRowDeltaData(SResultRowData* pStartRow, SResultRowData* pEndRow, SArray* pDelta, SFillColInfo* pFillCol,
- int32_t numOfCol, int32_t winCount) {
+static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol,
+ int32_t numOfCol) {
for (int32_t i = 0; i < numOfCol; i++) {
if (!pFillCol[i].notFillCol) {
int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i);
- SResultCellData* pSCell = getResultCell(pStartRow, slotId);
- double start = 0.0;
- GET_TYPED_DATA(start, double, pSCell->type, pSCell->pData);
SResultCellData* pECell = getResultCell(pEndRow, slotId);
- double end = 0.0;
- GET_TYPED_DATA(end, double, pECell->type, pECell->pData);
- double delta = (end - start) / winCount;
- taosArraySet(pDelta, slotId, &delta);
+ SPoint* pPoint = taosArrayGet(pEndPoins, slotId);
+ pPoint->key = pEndRow->key;
+ memcpy(pPoint->val, pECell->pData, pECell->bytes);
}
}
}
@@ -674,10 +675,8 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
- int32_t numOfWins = taosTimeCountInterval(pFillSup->prev.key, pFillSup->next.key, pFillSup->interval.sliding,
- pFillSup->interval.slidingUnit, pFillSup->interval.precision);
- calcRowDeltaData(&pFillSup->prev, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
- pFillSup->numOfAllCols, numOfWins);
+ calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
+ pFillSup->numOfAllCols);
pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->winIndex = 0;
} break;
@@ -780,25 +779,19 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_MID;
pFillInfo->pLinearInfo->nextEnd = nextWKey;
- int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding,
- pFillSup->interval.slidingUnit, pFillSup->interval.precision);
- calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
- pFillSup->numOfAllCols, numOfWins);
+ calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
+ pFillSup->numOfAllCols);
pFillInfo->pResRow = &pFillSup->prev;
- numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
- pFillSup->interval.precision);
- calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pNextDeltaVal, pFillSup->pAllColInfo,
- pFillSup->numOfAllCols, numOfWins);
+ calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
+ pFillSup->numOfAllCols);
pFillInfo->pLinearInfo->hasNext = true;
} else if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
- int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding,
- pFillSup->interval.slidingUnit, pFillSup->interval.precision);
- calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
- pFillSup->numOfAllCols, numOfWins);
+ calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
+ pFillSup->numOfAllCols);
pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false;
} else {
@@ -806,10 +799,8 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
- int32_t numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding,
- pFillSup->interval.slidingUnit, pFillSup->interval.precision);
- calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
- pFillSup->numOfAllCols, numOfWins);
+ calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
+ pFillSup->numOfAllCols);
pFillInfo->pResRow = &pFillSup->cur;
pFillInfo->pLinearInfo->hasNext = false;
}
@@ -906,13 +897,18 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
colDataSetNULL(pColData, index);
continue;
}
- double* pDelta = taosArrayGet(pFillInfo->pLinearInfo->pDeltaVal, slotId);
+ SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
double vCell = 0;
- GET_TYPED_DATA(vCell, double, pCell->type, pCell->pData);
- vCell += (*pDelta) * pFillInfo->pLinearInfo->winIndex;
- int64_t result = 0;
- SET_TYPED_DATA(&result, pCell->type, vCell);
- colDataSetVal(pColData, index, (const char*)&result, false);
+ SPoint start = {0};
+ start.key = pFillInfo->pResRow->key;
+ start.val = pCell->pData;
+
+ SPoint cur = {0};
+ cur.key = pFillInfo->current;
+ cur.val = taosMemoryCalloc(1, pCell->bytes);
+ taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
+ colDataSetVal(pColData, index, (const char*)cur.val, false);
+ destroySPoint(&cur);
}
}
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
@@ -953,8 +949,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->winIndex = 0;
- taosArrayClear(pFillInfo->pLinearInfo->pDeltaVal);
- taosArrayAddAll(pFillInfo->pLinearInfo->pDeltaVal, pFillInfo->pLinearInfo->pNextDeltaVal);
+ taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
pFillInfo->pResRow = &pFillSup->cur;
setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
doStreamFillLinear(pFillSup, pFillInfo, pRes);
@@ -1359,15 +1354,19 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
- pFillInfo->pLinearInfo->pDeltaVal = NULL;
- pFillInfo->pLinearInfo->pNextDeltaVal = NULL;
+ pFillInfo->pLinearInfo->pEndPoints = NULL;
+ pFillInfo->pLinearInfo->pNextEndPoints = NULL;
if (pFillSup->type == TSDB_FILL_LINEAR) {
- pFillInfo->pLinearInfo->pDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double));
- pFillInfo->pLinearInfo->pNextDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double));
+ pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
+ pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
- double value = 0.0;
- taosArrayPush(pFillInfo->pLinearInfo->pDeltaVal, &value);
- taosArrayPush(pFillInfo->pLinearInfo->pNextDeltaVal, &value);
+ SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
+ SPoint value = {0};
+ value.val = taosMemoryCalloc(1, pColData->info.bytes);
+ taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
+
+ value.val = taosMemoryCalloc(1, pColData->info.bytes);
+ taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
}
}
pFillInfo->pLinearInfo->winIndex = 0;
diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c
index fdce9a95df..5ca82808de 100644
--- a/source/libs/executor/src/projectoperator.c
+++ b/source/libs/executor/src/projectoperator.c
@@ -161,10 +161,9 @@ static int32_t discardGroupDataBlock(SSDataBlock* pBlock, SLimitInfo* pLimitInfo
if (pLimitInfo->remainGroupOffset > 0) {
return PROJECT_RETRIEVE_CONTINUE;
}
- }
- // set current group id of the project operator
- pLimitInfo->currentGroupId = pBlock->info.id.groupId;
+ pLimitInfo->currentGroupId = 0;
+ }
}
return PROJECT_RETRIEVE_DONE;
@@ -175,19 +174,29 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
// here check for a new group data, we need to handle the data of the previous group.
ASSERT(pLimitInfo->remainGroupOffset == 0 || pLimitInfo->remainGroupOffset == -1);
- if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
+ bool newGroup = false;
+ if (0 == pBlock->info.id.groupId) {
+ pLimitInfo->numOfOutputGroups = 1;
+ } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
+ pLimitInfo->currentGroupId = pBlock->info.id.groupId;
pLimitInfo->numOfOutputGroups += 1;
- if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
- setOperatorCompleted(pOperator);
- return PROJECT_RETRIEVE_DONE;
- }
-
- // reset the value for a new group data
- // existing rows that belongs to previous group.
- resetLimitInfoForNextGroup(pLimitInfo);
+ newGroup = true;
+ } else {
+ return PROJECT_RETRIEVE_CONTINUE;
}
- return PROJECT_RETRIEVE_DONE;
+ if ((pLimitInfo->slimit.limit >= 0) && (pLimitInfo->slimit.limit < pLimitInfo->numOfOutputGroups)) {
+ setOperatorCompleted(pOperator);
+ return PROJECT_RETRIEVE_DONE;
+ }
+
+ // reset the value for a new group data
+ // existing rows that belongs to previous group.
+ if (newGroup) {
+ resetLimitInfoForNextGroup(pLimitInfo);
+ }
+
+ return PROJECT_RETRIEVE_CONTINUE;
}
// todo refactor
@@ -199,7 +208,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
if (pBlock->info.rows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
} else {
- if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
+ if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
setOperatorCompleted(pOperator);
}
}
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 31f022f368..2be6fd87ca 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -1775,6 +1775,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
+ printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
@@ -1785,7 +1786,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
- // printDataBlock(pSDB, "stream scan update");
+ printDataBlock(pSDB, "scan recover update");
calBlockTbName(pInfo, pSDB);
return pSDB;
}
@@ -1810,6 +1811,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
+ printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
return pInfo->pCreateTbRes;
}
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index fef588a503..b01143841c 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -2110,10 +2110,12 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
} else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
- qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
- pTaskInfo->code = code;
- T_LONG_JMP(pTaskInfo->env, code);
+ qError("%s apply combine functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
}
+ } else if (pDestCtx[k].fpSet.combine == NULL) {
+ char* funName = fmGetFuncName(pDestCtx[k].functionId);
+ qError("%s error, combine funcion for %s is not implemented", GET_TASKID(pTaskInfo), funName);
+ taosMemoryFreeClear(funName);
}
}
}
@@ -2769,6 +2771,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
+ pInfo->ignoreExpiredDataSaved = false;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
@@ -3587,6 +3590,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
+ pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL;
pInfo->pStUpdated = NULL;
@@ -4112,6 +4116,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
+ pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL;
pInfo->pSeUpdated = NULL;
@@ -4885,6 +4890,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->interval = interval;
pInfo->twAggSup = twAggSupp;
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
+ pInfo->ignoreExpiredDataSaved = false;
pInfo->isFinal = false;
SExprSupp* pSup = &pOperator->exprSupp;
diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h
index dc884a0581..c3afc30a7b 100644
--- a/source/libs/function/inc/builtinsimpl.h
+++ b/source/libs/function/inc/builtinsimpl.h
@@ -235,6 +235,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t groupKeyFunction(SqlFunctionCtx* pCtx);
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
+int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
#ifdef __cplusplus
}
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index 0257b3d5e6..4fcc9ac1ad 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "irate",
.type = FUNCTION_TYPE_IRATE,
- .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
+ .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateIrate,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
@@ -3234,6 +3234,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = groupKeyFunction,
.finalizeFunc = groupKeyFinalize,
+ .combineFunc = groupKeyCombine,
.pPartialFunc = "_group_key",
.pMergeFunc = "_group_key"
},
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index 35cfa1753f..3c9f2fe8ca 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -5900,6 +5900,39 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
+int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
+ SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
+ SGroupKeyInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
+
+ SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
+ SGroupKeyInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
+
+ // escape rest of data blocks to avoid first entry to be overwritten.
+ if (pDBuf->hasResult) {
+ goto _group_key_over;
+ }
+
+ if (pSBuf->isNull) {
+ pDBuf->isNull = true;
+ pDBuf->hasResult = true;
+ goto _group_key_over;
+ }
+
+ if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) {
+ memcpy(pDBuf->data, pSBuf->data,
+ (pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data));
+ } else {
+ memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes);
+ }
+
+ pDBuf->hasResult = true;
+
+_group_key_over:
+
+ SET_VAL(pDResInfo, 1, 1);
+ return TSDB_CODE_SUCCESS;
+}
+
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c
index e3127fcd7b..94ab616dda 100644
--- a/source/libs/function/src/functionMgt.c
+++ b/source/libs/function/src/functionMgt.c
@@ -447,3 +447,10 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc
return code;
}
+
+char* fmGetFuncName(int32_t funcId) {
+ if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
+ return taosStrdup("invalid function");
+ }
+ return taosStrdup(funcMgtBuiltins[funcId].name);
+}
diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c
index b2fc88add1..33b8a686ae 100644
--- a/source/libs/parser/src/parCalcConst.c
+++ b/source/libs/parser/src/parCalcConst.c
@@ -361,11 +361,29 @@ static bool notRefByOrderBy(SColumnNode* pCol, SNodeList* pOrderByList) {
return !cxt.hasThisCol;
}
+static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pProj) {
+ if (!isUselessCol(pProj)) {
+ return false;
+ }
+
+ SNodeList* pLeftProjs = getChildProjection(pSetOp->pLeft);
+ if (!isUselessCol((SExprNode*)nodesListGetNode(pLeftProjs, index))) {
+ return false;
+ }
+
+ SNodeList* pRightProjs = getChildProjection(pSetOp->pRight);
+ if (!isUselessCol((SExprNode*)nodesListGetNode(pRightProjs, index))) {
+ return false;
+ }
+
+ return true;
+}
+
static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) {
int32_t index = 0;
SNode* pProj = NULL;
WHERE_EACH(pProj, pSetOp->pProjectionList) {
- if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isUselessCol((SExprNode*)pProj)) {
+ if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) {
ERASE_NODE(pSetOp->pProjectionList);
eraseSetOpChildProjection(pSetOp, index);
continue;
diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c
index b0ab9ddedf..ce1213609e 100644
--- a/source/libs/parser/src/parInsertUtil.c
+++ b/source/libs/parser/src/parInsertUtil.c
@@ -566,53 +566,19 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
return code;
}
-static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
- bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
- if (NULL == pUseCols) {
- return TSDB_CODE_OUT_OF_MEMORY;
- }
-
- pBoundInfo->numOfBound = 0;
-
- int16_t lastColIdx = -1; // last column found
- int32_t code = TSDB_CODE_SUCCESS;
+static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
for (int i = 0; i < numFields; i++) {
- SToken token;
- token.z = fields[i].name;
- token.n = strlen(fields[i].name);
-
- int16_t t = lastColIdx + 1;
- int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
- if (index < 0 && t > 0) {
- index = insFindCol(&token, 0, t, pSchema);
- }
- if (index < 0) {
- uError("can not find column name:%s", token.z);
- code = TSDB_CODE_PAR_INVALID_COLUMN;
- break;
- } else if (pUseCols[index]) {
- code = TSDB_CODE_PAR_INVALID_COLUMN;
- uError("duplicated column name:%s", token.z);
- break;
- } else {
- lastColIdx = index;
- pUseCols[index] = true;
- pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
- ++pBoundInfo->numOfBound;
+ if(strcmp(pSchema->name, fields[i].name) == 0){
+ return true;
}
}
- if (TSDB_CODE_SUCCESS == code && !pUseCols[0]) {
- uError("primary timestamp column can not be null:");
- code = TSDB_CODE_PAR_INVALID_COLUMN;
- }
-
- taosMemoryFree(pUseCols);
- return code;
+ return false;
}
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields,
int numFields, bool needChangeLength) {
+ void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true);
@@ -620,19 +586,14 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
uError("insGetTableDataCxt error");
goto end;
}
- if (tFields != NULL) {
- ret = bindFileds(&pTableCxt->boundColsInfo, getTableColumnSchema(pTableMeta), tFields, numFields);
+
+ if(tmp == NULL){
+ ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) {
- uError("bindFileds error");
+ uError("initTableColSubmitData error");
goto end;
}
}
- // no need to bind, because select * get all fields
- ret = initTableColSubmitData(pTableCxt);
- if (ret != TSDB_CODE_SUCCESS) {
- uError("initTableColSubmitData error");
- goto end;
- }
char* p = (char*)data;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
@@ -660,35 +621,43 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta);
SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
- if (boundInfo->numOfBound != numOfCols) {
- uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols);
+ if (numFields != numOfCols) {
+ uError("numFields:%d != numOfCols:%d", numFields, numOfCols);
+ ret = TSDB_CODE_INVALID_PARA;
+ goto end;
+ }
+ if (numFields > boundInfo->numOfBound) {
+ uError("numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
- SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
+ SSchema* pColSchema = &pSchema[c];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
+ if(findFileds(pColSchema, tFields, numFields)){
+ if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
+ uError("type or bytes not equal");
+ ret = TSDB_CODE_INVALID_PARA;
+ goto end;
+ }
- if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
- uError("type or bytes not equal");
- ret = TSDB_CODE_INVALID_PARA;
- goto end;
- }
+ int8_t* offset = pStart;
+ if (IS_VAR_DATA_TYPE(pColSchema->type)) {
+ pStart += numOfRows * sizeof(int32_t);
+ } else {
+ pStart += BitmapLen(numOfRows);
+ }
+ char* pData = pStart;
- int8_t* offset = pStart;
- if (IS_VAR_DATA_TYPE(pColSchema->type)) {
- pStart += numOfRows * sizeof(int32_t);
- } else {
- pStart += BitmapLen(numOfRows);
- }
- char* pData = pStart;
-
- tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
- fields += sizeof(int8_t) + sizeof(int32_t);
- if (needChangeLength) {
- pStart += htonl(colLength[c]);
- } else {
- pStart += colLength[c];
+ tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
+ fields += sizeof(int8_t) + sizeof(int32_t);
+ if (needChangeLength) {
+ pStart += htonl(colLength[c]);
+ } else {
+ pStart += colLength[c];
+ }
+ }else{
+ tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, NULL, NULL);
}
}
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index a3c79b5757..68c93f8ca9 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -1422,6 +1422,9 @@ static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (isCountStar(pFunc)) {
return rewriteCountStar(pCxt, pFunc);
}
+ if (isCountNotNullValue(pFunc)) {
+ return rewriteCountNotNullValue(pCxt, pFunc);
+ }
if (isCountTbname(pFunc)) {
return rewriteCountTbname(pCxt, pFunc);
}
diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c
index 793d05721e..563bc5e780 100644
--- a/source/libs/parser/src/parUtil.c
+++ b/source/libs/parser/src/parUtil.c
@@ -444,7 +444,7 @@ static int32_t getInsTagsTableTargetNameFromOp(int32_t acctId, SOperatorNode* pO
} else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) {
pVal = (SValueNode*)pOper->pRight;
}
- if (NULL == pCol || NULL == pVal) {
+ if (NULL == pCol || NULL == pVal || NULL == pVal->literal || 0 == strcmp(pVal->literal, "")) {
return TSDB_CODE_SUCCESS;
}
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index cb610ad6b5..25b2656365 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (batchCnt >= batchSz) break;
}
if (taosArrayGetSize(pRes) == 0) {
- taosArrayDestroy(pRes);
- break;
+ if (finished) {
+ taosArrayDestroy(pRes);
+ qDebug("task %d finish recover exec task ", pTask->taskId);
+ break;
+ } else {
+ qDebug("task %d continue recover exec task ", pTask->taskId);
+ continue;
+ }
}
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c
index 29b7fa740c..5ea9be63db 100644
--- a/source/libs/tdb/src/db/tdbPager.c
+++ b/source/libs/tdb/src/db/tdbPager.c
@@ -947,6 +947,12 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
return 0;
}
+static int32_t txnIdCompareDesc(const void *pLeft, const void *pRight) {
+ int64_t lhs = *(int64_t *)pLeft;
+ int64_t rhs = *(int64_t *)pRight;
+ return lhs > rhs ? -1 : 1;
+}
+
int tdbPagerRestoreJournals(SPager *pPager) {
tdbDirEntryPtr pDirEntry;
tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName);
@@ -955,23 +961,33 @@ int tdbPagerRestoreJournals(SPager *pPager) {
return -1;
}
+ SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));
+
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
- char jname[TD_PATH_MAX] = {0};
- int dirLen = strlen(pPager->pEnv->dbName);
- memcpy(jname, pPager->pEnv->dbName, dirLen);
- jname[dirLen] = '/';
- memcpy(jname + dirLen + 1, name, strlen(name));
- if (tdbPagerRestore(pPager, jname) < 0) {
- tdbCloseDir(&pDir);
+ int64_t txnId = -1;
+ sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
+ taosArrayPush(pTxnList, &txnId);
+ }
+ }
+ taosArraySort(pTxnList, txnIdCompareDesc);
+ for (int i = 0; i < TARRAY_SIZE(pTxnList); ++i) {
+ int64_t *pTxnId = taosArrayGet(pTxnList, i);
+ char jname[TD_PATH_MAX] = {0};
+ int dirLen = strlen(pPager->pEnv->dbName);
+ memcpy(jname, pPager->pEnv->dbName, dirLen);
+ jname[dirLen] = '/';
+ sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
+ if (tdbPagerRestore(pPager, jname) < 0) {
+ tdbCloseDir(&pDir);
- tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name);
- return -1;
- }
+ tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname);
+ return -1;
}
}
+ taosArrayDestroy(pTxnList);
tdbCloseDir(&pDir);
return 0;
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index 8c8d41d5b6..58fa0cbfab 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -871,6 +871,7 @@
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
,,y,script,./test.sh -f tsim/query/partitionby.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim
+,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim
diff --git a/tests/script/coverage_test.sh b/tests/script/coverage_test.sh
index 3f99dde544..ad87528f9c 100755
--- a/tests/script/coverage_test.sh
+++ b/tests/script/coverage_test.sh
@@ -12,6 +12,7 @@ fi
today=`date +"%Y%m%d"`
TDENGINE_DIR=/root/TDengine
JDBC_DIR=/root/taos-connector-jdbc
+TAOSKEEPER_DIR=/root/taoskeeper
TDENGINE_COVERAGE_REPORT=$TDENGINE_DIR/tests/coverage-report-$today.log
# Color setting
@@ -171,6 +172,24 @@ function runJDBCCases() {
echo -e "### JDBC test result: $summary ###" | tee -a $TDENGINE_COVERAGE_REPORT
}
+function runTaosKeeperCases() {
+ echo "=== Run taoskeeper cases ==="
+
+ cd $TAOSKEEPER_DIR
+ git checkout -- .
+ git reset --hard HEAD
+ git checkout master
+ git pull
+
+ stopTaosd
+ stopTaosadapter
+
+ taosd -c /etc/taos >> /dev/null 2>&1 &
+ taosadapter >> /dev/null 2>&1 &
+
+ go mod tidy && go test -v ./...
+}
+
function runTest() {
echo "run Test"
@@ -182,6 +201,7 @@ function runTest() {
runSimCases
runPythonCases
runJDBCCases
+ runTaosKeeperCases
stopTaosd
cd $TDENGINE_DIR/tests/script
@@ -199,7 +219,7 @@ function lcovFunc {
lcov -d . --capture --rc lcov_branch_coverage=1 --rc genhtml_branch_coverage=1 --no-external -b $TDENGINE_DIR -o coverage.info
# remove exclude paths
- if [ "$branch" == "3.0" ]; then
+ if [ "$branch" == "main" ] ; then
lcov --remove coverage.info \
'*/contrib/*' '*/tests/*' '*/test/*' '*/tools/*' '*/libs/sync/*'\
'*/AccessBridgeCalls.c' '*/ttszip.c' '*/dataInserter.c' '*/tlinearhash.c' '*/tsimplehash.c' '*/tsdbDiskData.c'\
@@ -209,6 +229,8 @@ function lcovFunc {
'*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' '*/catalog.c'\
'*/tqSnapshot.c' '*/tsdbSnapshot.c''*/metaSnapshot.c' '*/smaSnapshot.c' '*/tqOffsetSnapshot.c'\
'*/vnodeSnapshot.c' '*/metaSnapshot.c' '*/tsdbSnapshot.c' '*/mndGrant.c' '*/mndSnode.c' '*/streamRecover.c'\
+ '*/osAtomic.c' '*/osDir.c' '*/osFile.c' '*/osMath.c' '*/osSignal.c' '*/osSleep.c' '*/osString.c' '*/osSystem.c'\
+ '*/osThread.c' '*/osTime.c' '*/osTimezone.c' \
--rc lcov_branch_coverage=1 -o coverage.info
else
lcov --remove coverage.info \
diff --git a/tests/script/tsim/parser/limit1_stb.sim b/tests/script/tsim/parser/limit1_stb.sim
index 1a5d57efbc..731a218de5 100644
--- a/tests/script/tsim/parser/limit1_stb.sim
+++ b/tests/script/tsim/parser/limit1_stb.sim
@@ -484,6 +484,7 @@ if $rows != 2 then
return -1
endi
+print === select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) limit 1 offset 0
sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) limit 1 offset 0
if $rows != 3 then
return -1
diff --git a/tests/script/tsim/parser/slimit1_query.sim b/tests/script/tsim/parser/slimit1_query.sim
index 9a27d5523b..1167fe0b3d 100644
--- a/tests/script/tsim/parser/slimit1_query.sim
+++ b/tests/script/tsim/parser/slimit1_query.sim
@@ -70,7 +70,7 @@ endi
### empty result set
sql select count(*) from stb partition by t2,t1 order by t2 asc slimit 0 soffset 0
-if $rows != 9 then
+if $rows != 0 then
return -1
endi
diff --git a/tests/script/tsim/query/nullColSma.sim b/tests/script/tsim/query/nullColSma.sim
new file mode 100644
index 0000000000..886274e7e5
--- /dev/null
+++ b/tests/script/tsim/query/nullColSma.sim
@@ -0,0 +1,139 @@
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+system sh/exec.sh -n dnode1 -s start
+sql connect
+
+$dbPrefix = m_in_db
+$tbPrefix = m_in_tb
+$mtPrefix = m_in_mt
+$tbNum = 1
+$rowNum = 200
+$totalNum = 400
+
+print =============== step1
+$i = 0
+$db = $dbPrefix . $i
+$mt = $mtPrefix . $i
+
+sql drop database if exists $db
+sql create database $db vgroups 1 maxrows 200 minrows 10;
+sql use $db
+sql create table $mt (ts timestamp, f1 int, f2 float) TAGS(tgcol int)
+
+print ====== start create child tables and insert data
+$i = 0
+while $i < $tbNum
+ $tb = $tbPrefix . $i
+ sql create table $tb using $mt tags( $i )
+
+ $x = 0
+ while $x < $rowNum
+ $cc = $x * 1
+ $ms = 1601481600000 + $cc
+
+ sql insert into $tb values ($ms , NULL , $x )
+ $x = $x + 1
+ endw
+
+ $i = $i + 1
+endw
+
+$i = 1
+$tb = $tbPrefix . $i
+sql create table $tb using $mt tags( $i )
+
+$x = 0
+while $x < $rowNum
+ $cc = $x * 1
+ $ms = 1601481600000 + $cc
+
+ sql insert into $tb values ($ms , $x , NULL )
+ $x = $x + 1
+endw
+
+sql flush database $db
+
+print =============== step2
+$i = 0
+$tb = $tbPrefix . $i
+sql select max(f1) from $tb
+if $rows != 1 then
+ return -1
+endi
+if $data00 != NULL then
+ return -1
+endi
+
+$i = 1
+$tb = $tbPrefix . $i
+sql select max(f2) from $tb
+if $rows != 1 then
+ return -1
+endi
+if $data00 != NULL then
+ return -1
+endi
+
+$rowNum = 10
+
+print ====== insert more data
+$i = 0
+while $i < $tbNum
+ $tb = $tbPrefix . $i
+
+ $x = 0
+ while $x < $rowNum
+ $cc = $x * 1
+ $ms = 1601481700000 + $cc
+
+ sql insert into $tb values ($ms , $x , $x )
+ $x = $x + 1
+ endw
+
+ $i = $i + 1
+endw
+
+$i = 1
+$tb = $tbPrefix . $i
+$x = 0
+while $x < $rowNum
+ $cc = $x * 1
+ $ms = 1601481700000 + $cc
+
+ sql insert into $tb values ($ms , $x , $x )
+ $x = $x + 1
+endw
+
+sql flush database $db
+
+print =============== step3
+$i = 0
+$tb = $tbPrefix . $i
+sql select max(f1) from $tb
+if $rows != 1 then
+ return -1
+endi
+if $data00 != 9 then
+ return -1
+endi
+
+$i = 1
+$tb = $tbPrefix . $i
+sql select max(f2) from $tb
+if $rows != 1 then
+ return -1
+endi
+if $data00 != 9.00000 then
+ print $data00
+ return -1
+endi
+
+
+print =============== clear
+#sql drop database $db
+#sql select * from information_schema.ins_databases
+#if $rows != 0 then
+# return -1
+#endi
+
+system sh/exec.sh -n dnode1 -s stop -x SIGINT
diff --git a/tests/script/tsim/query/sys_tbname.sim b/tests/script/tsim/query/sys_tbname.sim
index 7b3953129a..c676f2b1e0 100644
--- a/tests/script/tsim/query/sys_tbname.sim
+++ b/tests/script/tsim/query/sys_tbname.sim
@@ -51,6 +51,11 @@ if $data00 != @ins_stables@ then
return -1
endi
+sql select * from information_schema.ins_tables where table_name='';
+if $rows != 0 then
+ return -1
+endi
+
sql select tbname from information_schema.ins_tables;
print $rows $data00
if $rows != 33 then
diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim
index 1d58922928..de1c72f8dc 100644
--- a/tests/script/tsim/stream/distributeInterval0.sim
+++ b/tests/script/tsim/stream/distributeInterval0.sim
@@ -272,4 +272,122 @@ if $data12 != 2 then
goto loop3
endi
+print ===== step3
+
+sql drop database if exists test4;
+sql create database test4 vgroups 10;
+sql use test4;
+sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int);
+sql create table aaa using st tags(1,1,1);
+sql create table bbb using st tags(2,2,2);
+sql create table ccc using st tags(3,2,2);
+sql create table ddd using st tags(4,2,2);
+
+
+sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1 from st partition by c interval(1s) ;
+
+sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa");
+sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa");
+sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa");
+sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa");
+
+sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa");
+sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa");
+sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa");
+sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa");
+
+sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa");
+sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa");
+sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa");
+sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa");
+
+sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa");
+sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa");
+sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa");
+sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa");
+
+
+sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa");
+sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa");
+sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa");
+sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa");
+
+sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa");
+sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa");
+sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa");
+sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa");
+
+$loop_count = 0
+
+loop4:
+sleep 200
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+sql select * from streamst;
+
+if $rows == 0 then
+ goto loop4
+endi
+
+sql delete from aaa where ts = 1648791223003 ;
+
+$loop_count = 0
+
+loop5:
+sleep 200
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+sql select * from streamst;
+
+if $rows == 0 then
+ goto loop5
+endi
+
+
+sql delete from ccc;
+
+$loop_count = 0
+
+loop6:
+sleep 200
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+sql select * from streamst;
+
+if $rows == 0 then
+ goto loop6
+endi
+
+sql delete from ddd;
+
+$loop_count = 0
+
+loop7:
+sleep 200
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+sql select * from streamst;
+
+if $rows == 0 then
+ goto loop7
+endi
+
+print ===== over
+
system sh/stop_dnodes.sh
diff --git a/tests/system-test/0-others/check_assert.py b/tests/system-test/0-others/check_assert.py
index 59fb223528..ff69b9eeec 100644
--- a/tests/system-test/0-others/check_assert.py
+++ b/tests/system-test/0-others/check_assert.py
@@ -28,10 +28,10 @@ import os
NO_FOUND = 0 # not found assert or ASSERT
FOUND_OK = 1 # found ASSERT and valid usage
FOUND_NOIF = 2 # found ASSERT but no if like ASSERT(...)
-FOUND_LOWER = 3 # found assert write with lower letters
+FOUND_LOWER = 3 # found assert write with system assert
FOUND_HAVENOT = 4 # found ASSERT have if but have not like if(!ASSERT)
-code_strs = ["not found", "valid", "found but no if", "lower assert","found but have not"]
+code_strs = ["not found", "valid", "found but no if", "system assert","found but have not"]
#
diff --git a/tests/system-test/7-tmq/subscribeStb1.py b/tests/system-test/7-tmq/subscribeStb1.py
index 3941e3a9a8..3dc3528d04 100644
--- a/tests/system-test/7-tmq/subscribeStb1.py
+++ b/tests/system-test/7-tmq/subscribeStb1.py
@@ -245,8 +245,8 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
- if totalConsumeRows < expectrowcnt/4:
- tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
+ tdLog.info("act consume rows: %d, expect consume rows greater than or equal to: %d"%(totalConsumeRows, expectrowcnt/4))
+ if totalConsumeRows < expectrowcnt/4:
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
@@ -267,8 +267,8 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
- if totalConsumeRows < expectrowcnt:
- tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
+ tdLog.info("act consume rows: %d, expect consume rows greater than or equal to: %d"%(totalConsumeRows, expectrowcnt))
+ if totalConsumeRows < expectrowcnt:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
diff --git a/tests/system-test/7-tmq/tmqDelete-1ctb.py b/tests/system-test/7-tmq/tmqDelete-1ctb.py
index b09efdd1e6..69d2f5e347 100644
--- a/tests/system-test/7-tmq/tmqDelete-1ctb.py
+++ b/tests/system-test/7-tmq/tmqDelete-1ctb.py
@@ -323,7 +323,7 @@ class TDTestCase:
if self.snapshot == 0:
consumerId = 4
- expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
+ expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1/4 + 3/4))
elif self.snapshot == 1:
consumerId = 5
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
@@ -369,9 +369,14 @@ class TDTestCase:
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
if self.snapshot == 0:
- if totalConsumeRows != expectrowcnt:
+ # If data writing is completed before consumer get snapshot, will consume 7500 from wal;
+ # If data writing has not started before consumer get snapshot, will consume 10000 from wal;
+ minRows = int(expectrowcnt * (1 - 1/4)) # 7500
+ tdLog.info("consume rows should be between %d and %d, "%(minRows, expectrowcnt))
+ if not ((totalConsumeRows >= minRows) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
elif self.snapshot == 1:
+ tdLog.info("consume rows should be between %d and %d, "%(totalRowsFromQuery, expectrowcnt))
if not ((totalConsumeRows >= totalRowsFromQuery) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 1!")
@@ -381,7 +386,113 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 3 end ...... ")
+ def tmqCase4(self):
+ tdLog.printNoPrefix("======== test case 4: ")
+ paraDict = {'dbName': 'dbt',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'stbName': 'stb',
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbStartIdx': 0,
+ 'ctbNum': 1,
+ 'rowsPerTbl': 10000,
+ 'batchNum': 5000,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ 'pollDelay': 15,
+ 'showMsg': 1,
+ 'showRow': 1,
+ 'snapshot': 0}
+ paraDict['snapshot'] = self.snapshot
+ paraDict['vgroups'] = self.vgroups
+ paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+
+ tdLog.info("restart taosd to ensure that the data falls into the disk")
+ tdSql.query("flush database %s"%(paraDict['dbName']))
+
+ tmqCom.initConsumerTable()
+ tdLog.info("create topics from stb1")
+ topicFromStb1 = 'topic_stb1'
+ queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
+ sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
+ tdLog.info("create topic sql: %s"%sqlString)
+ tdSql.execute(sqlString)
+
+ # paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+ consumerId = 1
+
+ if self.snapshot == 0:
+ consumerId = 4
+ expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
+ elif self.snapshot == 1:
+ consumerId = 5
+ expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
+
+ topicList = topicFromStb1
+ ifcheckdata = 1
+ ifManualCommit = 1
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:true,\
+ auto.commit.interval.ms:1000,\
+ auto.offset.reset:earliest'
+ tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ # del some data
+ rowsOfDelete = int(self.rowsPerTbl / 4 )
+ paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
+ # pDeleteThread = self.asyncDeleteData(paraDict)
+ self.threadFunctionForDeletaData(paraDict)
+
+ tdLog.info("start consume processor")
+ tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
+
+ tmqCom.getStartConsumeNotifyFromTmqsim()
+
+ # update to 1/4 rows and insert 3/4 new rows
+ paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4)
+ # paraDict['rowsPerTbl'] = self.rowsPerTbl
+ # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
+ # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
+ # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
+ pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
+
+ pInsertThread.join()
+
+ tdLog.info("start to check consume result")
+ expectRows = 1
+ resultList = tmqCom.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ tdSql.query(queryString)
+ totalRowsFromQuery = tdSql.getRows()
+
+ tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
+
+ if self.snapshot == 0:
+ tdLog.info("consume rows should be %d"%(expectrowcnt))
+ if (totalConsumeRows != expectrowcnt):
+ tdLog.exit("tmq consume rows error with snapshot = 0!")
+ elif self.snapshot == 1:
+ # If data writing has not started before consumer get snapshot, will consume 10000 from wal, and consumer 7500 from tsdb;
+ tdLog.info("consume rows should be %d, "%(expectrowcnt))
+ if (totalConsumeRows != expectrowcnt):
+ tdLog.exit("tmq consume rows error with snapshot = 1!")
+
+ # tmqCom.checkFileContent(consumerId, queryString)
+
+ tdSql.query("drop topic %s"%topicFromStb1)
+
+ tdLog.printNoPrefix("======== test case 4 end ...... ")
+
def run(self):
# tdSql.prepare()
tdLog.printNoPrefix("=============================================")
@@ -409,6 +520,17 @@ class TDTestCase:
self.prepareTestEnv()
self.tmqCase3()
+ tdLog.printNoPrefix("=============================================")
+ tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
+ self.snapshot = 0
+ self.prepareTestEnv()
+ self.tmqCase4()
+ tdLog.printNoPrefix("====================================================================")
+ tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
+ self.snapshot = 1
+ self.prepareTestEnv()
+ self.tmqCase4()
+
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py
index a44ff916e5..8c19377c0b 100644
--- a/tests/system-test/7-tmq/tmqDnodeRestart.py
+++ b/tests/system-test/7-tmq/tmqDnodeRestart.py
@@ -136,7 +136,7 @@ class TDTestCase:
tdLog.info("================= restart dnode ===========================")
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
- # time.sleep(3)
+ time.sleep(5)
tdLog.info(" restart taosd end and wait to check consume result")
expectRows = 1
@@ -220,7 +220,7 @@ class TDTestCase:
tdLog.info("================= restart dnode ===========================")
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
- # time.sleep(3)
+ time.sleep(5)
tdLog.info("create some new child table and insert data ")
paraDict["batchNum"] = 100
diff --git a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
index 6c49fae299..c8bcdd6235 100644
--- a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
+++ b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
@@ -100,7 +100,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
- if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)):
+ if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
@@ -131,7 +131,7 @@ class TDTestCase:
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0,
- 'pollDelay': 10,
+ 'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
@@ -193,7 +193,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
- if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)):
+ if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
diff --git a/tests/system-test/7-tmq/tmqMultiConsumer.py b/tests/system-test/7-tmq/tmqMultiConsumer.py
new file mode 100644
index 0000000000..cc217e0c4c
--- /dev/null
+++ b/tests/system-test/7-tmq/tmqMultiConsumer.py
@@ -0,0 +1,174 @@
+
+import taos
+import sys
+import time
+import socket
+import os
+import threading
+import math
+# from tests.pytest.util.common import TDCom
+# from tests.pytest.util.log import TDLog
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import *
+from util.common import *
+sys.path.append("./7-tmq")
+from tmqCommon import *
+
+class TDTestCase:
+ def __init__(self):
+ self.vgroups = 32
+ self.ctbNum = 100
+ self.rowsPerTbl = 1000
+ self.snapshot = 1
+ self.replicaVar = 3
+
+ def init(self, conn, logSql, replicaVar=1):
+ self.replicaVar = int(replicaVar)
+ tdLog.debug(f"start to excute {__file__}")
+ tdSql.init(conn.cursor(), False)
+
+ def prepareTestEnv(self):
+ tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
+ paraDict = {'dbName': 'dbt',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'stbName': 'stb',
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbStartIdx': 0,
+ 'ctbNum': 10,
+ 'rowsPerTbl': 10000,
+ 'batchNum': 1000,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ 'pollDelay': 10,
+ 'showMsg': 1,
+ 'showRow': 1,
+ 'snapshot': 0}
+
+ paraDict['vgroups'] = self.vgroups
+ paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+
+ tmqCom.initConsumerTable()
+ tmqCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replicaVar)
+ tdLog.info("create stb")
+ tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
+ tdLog.info("create ctb")
+ tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
+ ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
+ tdLog.info("insert data")
+ tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
+ ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
+ startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
+
+ # tdLog.info("restart taosd to ensure that the data falls into the disk")
+ # tdDnodes.stop(1)
+ # tdDnodes.start(1)
+ # tdSql.query("flush database %s"%(paraDict['dbName']))
+ return
+
+ def tmqCase1(self):
+ tdLog.printNoPrefix("======== test case 1: ")
+ paraDict = {'dbName': 'dbt',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 1,
+ 'stbName': 'stb',
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbStartIdx': 0,
+ 'ctbNum': 1,
+ 'rowsPerTbl': 10000,
+ 'batchNum': 1000,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ 'pollDelay': 20,
+ 'showMsg': 1,
+ 'showRow': 1,
+ 'snapshot': 1}
+
+ paraDict['vgroups'] = self.vgroups
+ paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+ paraDict['snapshot'] = self.snapshot
+
+ topicNameList = ['topic1', 'topic2']
+ tmqCom.initConsumerTable()
+
+ tdLog.info("create topics from stb with filter")
+ # queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
+ queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s "%(paraDict['dbName'], paraDict['stbName'])
+ # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
+ sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
+ tdLog.info("create topic sql: %s"%sqlString)
+ tdSql.execute(sqlString)
+
+ sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
+ tdLog.info("create topic sql: %s"%sqlString)
+ tdSql.execute(sqlString)
+ # tdSql.query(queryString)
+ # expectRowsList.append(tdSql.getRows())
+
+ # init consume info, and start tmq_sim, then check consume result
+ tdLog.info("insert consume info to consume processor")
+ consumerId = 0
+ expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 4
+ topicList = topicNameList[0] + ',' + topicNameList[0] + ',' + topicNameList[1]
+ ifcheckdata = 0
+ ifManualCommit = 1
+ keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
+ tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ consumerId = 1
+ tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("start consume processor")
+ tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
+ tdLog.info("wait the consume result")
+
+ # continue to insert new rows
+ paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
+ pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
+ pInsertThread.join()
+
+ expectRows = 2
+ resultList = tmqCom.selectConsumeResult(expectRows)
+ actConsumeTotalRows = resultList[0] + resultList[1]
+
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(actConsumeTotalRows, expectrowcnt))
+
+ if not ((expectrowcnt <= actConsumeTotalRows) or ((resultList[0] == 0) and (resultList[1] >= expectrowcnt)) or ((resultList[1] == 0) and (resultList[0] >= expectrowcnt))):
+ tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, actConsumeTotalRows))
+ tdLog.exit("%d tmq consume rows error!"%consumerId)
+
+ # tmqCom.checkFileContent(consumerId, queryString)
+
+ time.sleep(10)
+ for i in range(len(topicNameList)):
+ tdSql.query("drop topic %s"%topicNameList[i])
+
+ tdLog.printNoPrefix("======== test case 1 end ...... ")
+
+
+ def run(self):
+ tdSql.prepare()
+ self.prepareTestEnv()
+ self.tmqCase1()
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+event = threading.Event()
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c
index 3080b15b8c..f9602aaa4d 100644
--- a/tools/shell/src/shellEngine.c
+++ b/tools/shell/src/shellEngine.c
@@ -541,7 +541,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*" PRIu64, width, *((uint64_t *)val));
break;
case TSDB_DATA_TYPE_FLOAT:
- printf("%*.5f", width, GET_FLOAT_VAL(val));
+ printf("%*ef", width, GET_FLOAT_VAL(val));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val));