diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b4674aba54..d78e771fcf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2807,39 +2807,49 @@ typedef struct { int64_t suid; } SMqRebVgReq; -static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->leftForVer); - tlen += taosEncodeFixedI32(buf, pReq->vgId); - tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); - tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); - tlen += taosEncodeString(buf, pReq->subKey); - tlen += taosEncodeFixedI8(buf, pReq->subType); - tlen += taosEncodeFixedI8(buf, pReq->withMeta); +static FORCE_INLINE int tEncodeSMqRebVgReq(SEncoder *pCoder, const SMqRebVgReq* pReq) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1; + if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1; + if (tEncodeI8(pCoder, pReq->subType) < 0) return -1; + if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1; + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - tlen += taosEncodeString(buf, pReq->qmsg); + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - tlen += taosEncodeFixedI64(buf, pReq->suid); - tlen += taosEncodeString(buf, pReq->qmsg); + if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; } - return tlen; + tEndEncode(pCoder); + return 0; } -static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->leftForVer); - buf = taosDecodeFixedI32(buf, &pReq->vgId); - buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); - buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); - buf = taosDecodeStringTo(buf, pReq->subKey); - buf = taosDecodeFixedI8(buf, &pReq->subType); - buf = taosDecodeFixedI8(buf, &pReq->withMeta); +static FORCE_INLINE int tDecodeSMqRebVgReq(SDecoder *pCoder, SMqRebVgReq* pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1; + + if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1; + if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1; + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - buf = taosDecodeString(buf, &pReq->qmsg); + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - buf = taosDecodeFixedI64(buf, &pReq->suid); - buf = taosDecodeString(buf, &pReq->qmsg); + if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; + if (!tDecodeIsEnd(pCoder)){ + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; + } } - return (void*)buf; + + tEndDecode(pCoder); + return 0; } typedef struct { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e62102fa77..74421afa33 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -111,7 +111,14 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri req.suid = pSub->stbUid; tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); - int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); + int32_t tlen = 0; + int32_t ret = 0; + tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); + if (ret < 0) { + return -1; + } + + tlen += sizeof(SMsgHead); void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -123,8 +130,14 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri pMsgHead->contLen = htonl(tlen); pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqRebVgReq(&abuf, &req); + SEncoder encoder = {0}; + tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen); + if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { + taosMemoryFreeClear(buf); + tEncoderClear(&encoder); + return -1; + } + tEncoderClear(&encoder); *pBuf = buf; *pLen = tlen; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 024d07544f..265e6dbc13 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -23,8 +23,8 @@ static int32_t tqInitialize(STQ* pTq); static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; } -static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;} -static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;} +static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } +static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } int32_t tqInit() { int8_t old; @@ -78,7 +78,7 @@ static void destroyTqHandle(void* data) { taosMemoryFreeClear(pData->execHandle.execTb.qmsg); nodesDestroyNode(pData->execHandle.execTb.node); } - if(pData->msg != NULL) { + if (pData->msg != NULL) { rpcFreeCont(pData->msg->pCont); taosMemoryFree(pData->msg); pData->msg = NULL; @@ -240,14 +240,15 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever); + tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, + ever); char buf1[80] = {0}; char buf2[80] = {0}; tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, + dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -263,8 +264,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, - vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, + pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); return 0; } @@ -336,8 +337,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); if (pHandle == NULL) { - tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, - pOffset->subKey); + tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -353,7 +353,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) } taosRUnLockLatch(&pTq->lock); - //3. check the offset info + // 3. check the offset info STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset != NULL) { if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { @@ -381,7 +381,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); } else { - tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version); + tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); } if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { @@ -423,7 +423,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq req = {0}; - int code = 0; + int code = 0; if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); terrno = TSDB_CODE_INVALID_MSG; @@ -449,7 +449,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // 2. check re-balance status if (pHandle->consumerId != consumerId) { - tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + tqError("ERROR tmq poll: consumer:0x%" PRIx64 + " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; taosWUnLockLatch(&pTq->lock); @@ -457,22 +458,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } bool exec = tqIsHandleExec(pHandle); - if(!exec) { + if (!exec) { tqSetHandleExec(pHandle); -// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, + req.subKey, pHandle); taosWUnLockLatch(&pTq->lock); break; } taosWUnLockLatch(&pTq->lock); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 + "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", + consumerId, vgId, req.subKey, pHandle); taosMsleep(10); } // 3. update the epoch value if (pHandle->epoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch); + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, + reqEpoch); pHandle->epoch = reqEpoch; } @@ -484,7 +489,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); tqSetHandleIdle(pHandle); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, + req.subKey, pHandle); return code; } @@ -548,7 +554,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { if (reqOffset.type == TMQ_OFFSET__LOG) { int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); - if (currentVer == -1) { // not start to read data from wal yet, return req offset directly + if (currentVer == -1) { // not start to read data from wal yet, return req offset directly dataRsp.rspOffset.version = reqOffset.version; } else { dataRsp.rspOffset.version = currentVer; // return current consume offset value @@ -572,7 +578,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t vgId = TD_VID(pTq->pVnode); + int32_t vgId = TD_VID(pTq->pVnode); tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; @@ -581,7 +587,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { while (tqIsHandleExec(pHandle)) { - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, + pHandle->subKey, pHandle); taosMsleep(10); } @@ -641,9 +648,18 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t } int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - int ret = 0; + int ret = 0; SMqRebVgReq req = {0}; - tDecodeSMqRebVgReq(msg, &req); + SDecoder dc = {0}; + + tDecoderInit(&dc, msg, msgLen); + + // decode req + if (tDecodeSMqRebVgReq(&dc, &req) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&dc); + return -1; + } SVnode* pVnode = pTq->pVnode; int32_t vgId = TD_VID(pVnode); @@ -689,8 +705,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->snapshotVer = ver; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - pHandle->execHandle.execCol.qmsg = req.qmsg; - req.qmsg = NULL; + pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg); pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId); @@ -710,10 +725,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->execHandle.execTb.suid = req.suid; - pHandle->execHandle.execTb.qmsg = req.qmsg; - req.qmsg = NULL; + pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg); - if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { + if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(), pVnode->config.vgId, req.subKey, pHandle->consumerId); @@ -727,45 +741,47 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SArray* tbUidList = NULL; ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task); - if(ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId); + if (ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, + pHandle->consumerId); taosArrayDestroy(tbUidList); goto end; } - tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); + tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, + pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, - pHandle->consumerId); + tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; } else { taosWLockLatch(&pTq->lock); if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId); + tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, + req.newConsumerId); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); } -// atomic_add_fetch_32(&pHandle->epoch, 1); + // atomic_add_fetch_32(&pHandle->epoch, 1); // kill executing task -// if(tqIsHandleExec(pHandle)) { -// qTaskInfo_t pTaskInfo = pHandle->execHandle.task; -// if (pTaskInfo != NULL) { -// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); -// } + // if(tqIsHandleExec(pHandle)) { + // qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + // if (pTaskInfo != NULL) { + // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + // } -// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { -// qStreamCloseTsdbReader(pTaskInfo); -// } -// } + // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + // qStreamCloseTsdbReader(pTaskInfo); + // } + // } // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); taosWUnLockLatch(&pTq->lock); @@ -773,13 +789,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } end: - taosMemoryFree(req.qmsg); + tDecoderClear(&dc); return ret; } -void freePtr(void *ptr) { - taosMemoryFree(*(void**)ptr); -} +void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); @@ -802,7 +816,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->chkInfo.currentVer = ver; // expand executor - pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL; + pTask->status.taskStatus = (pTask->fillHistory) ? TASK_STATUS__WAIT_DOWNSTREAM : TASK_STATUS__NORMAL; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -868,8 +882,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, - pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, + pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); // next valid version will add one pTask->chkInfo.version += 1; @@ -982,7 +996,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask)); + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, + (int32_t)sizeof(SStreamTask)); return -1; } @@ -1097,7 +1112,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t // do recovery step 2 int64_t st = taosGetTimestampMs(); - tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st); + tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st); code = streamSourceRecoverScanStep2(pTask, sversion); if (code < 0) { @@ -1105,7 +1120,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion); + qDebug("s-task:%s set the start wal offset to be:%" PRId64, pTask->id.idStr, sversion); walReaderSeekVer(pTask->exec.pWalReader, sversion); pTask->chkInfo.currentVer = sversion; @@ -1129,7 +1144,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - double el = (taosGetTimestampMs() - st)/ 1000.0; + double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); // dispatch recover finish req to all related downstream task @@ -1245,8 +1260,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { - tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, - pTask->id.idStr, pTask->chkInfo.version); + tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.version); streamProcessRunReq(pTask); } else { if (streamTaskShouldPause(&pTask->status)) { @@ -1265,9 +1280,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamDispatchReq req = {0}; @@ -1313,7 +1328,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 146552b058..6d17672513 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +// clang-format off + #include "executorInt.h" #include "filter.h" #include "function.h" @@ -2444,6 +2446,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys int32_t len = 0; pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len); streamScanOperatorDecode(buff, len, pInfo); + taosMemoryFree(buff); } setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, @@ -3458,3 +3461,5 @@ static void destoryTableCountScanOperator(void* param) { taosArrayDestroy(pTableCountScanInfo->stbUidList); taosMemoryFreeClear(param); } + +// clang-format on diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c743ecf7f7..b3995f020b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -87,8 +87,6 @@ void* streamBackendInit(const char* path) { pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); - rocksdb_env_set_low_priority_background_threads(env, 4); - rocksdb_env_set_high_priority_background_threads(env, 2); rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20); @@ -574,9 +572,14 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { *dest = NULL; return -1; } - int64_t now = taosGetTimestampMs(); p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); + if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) { + if (dest != NULL) *dest = NULL; + qError("vlen: %d, read len: %d", vlen, key.len); + return -1; + } + if (key.len == 0) { key.data = NULL; } else { @@ -584,6 +587,7 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { } if (ttl != NULL) { + int64_t now = taosGetTimestampMs(); *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; } if (dest != NULL) { @@ -1005,35 +1009,35 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = \ - ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - qTrace("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ - } \ - taosMemoryFree(ttlV); \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(pState, funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = \ + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + taosMemoryFree(err); \ + qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + code = -1; \ + } else { \ + qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ + } \ + taosMemoryFree(ttlV); \ } while (0); #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ @@ -1056,7 +1060,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ size_t len = 0; \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL) { \ + if (val == NULL || len == 0) { \ if (err == NULL) { \ qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ funcname); \ @@ -1068,17 +1072,17 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = -1; \ } else { \ char* p = NULL; \ - int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ - if (len < 0) { \ + int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (tlen <= 0) { \ qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ funcname); \ code = -1; \ } else { \ qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ - len); \ + tlen); \ } \ taosMemoryFree(val); \ - if (vLen != NULL) *vLen = len; \ + if (vLen != NULL) *vLen = tlen; \ } \ if (code == 0) \ qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \ @@ -1924,17 +1928,17 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen); + STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen); + STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "default", &key); + STREAM_STATE_DEL_ROCKSDB(pState, "default", key); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 716b939e5f..49b35ce8db 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,11 +16,11 @@ #include "streamInc.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 8 -#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 8 +#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 -static int32_t updateCheckPointInfo (SStreamTask* pTask); +static int32_t updateCheckPointInfo(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); @@ -48,10 +48,11 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return -1; } - qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); + qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, + size / 1048576.0); code = streamTaskOutputResultBlock(pTask, pStreamBlocks); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position destroyStreamDataBlock(pStreamBlocks); return -1; } @@ -65,7 +66,8 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return TSDB_CODE_SUCCESS; } -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, + int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -82,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } if (streamTaskShouldStop(&pTask->status)) { - taosArrayDestroy(pRes); // memory leak + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -99,9 +101,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - SSDataBlock block = {0}; - - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem; + SSDataBlock block = {0}; + const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); @@ -153,7 +154,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); } else { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } return code; @@ -286,7 +287,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif -int32_t updateCheckPointInfo (SStreamTask* pTask) { +int32_t updateCheckPointInfo(SStreamTask* pTask) { int64_t ckId = 0; int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); @@ -294,7 +295,8 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) { SCheckpointInfo* pCkInfo = &pTask->chkInfo; if (ckId > pCkInfo->id) { // save it since the checkpoint is updated qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 - ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); + ", checkPoint id:%" PRId64 " -> %" PRId64, + pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer}; @@ -417,14 +419,15 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, - pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, + pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; SArray* pBlockList = pBlock->blocks; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, + pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; @@ -445,8 +448,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t totalBlocks = 0; streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); - double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks); + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, + resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index fff666ec9f..85be120dbd 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -20,9 +20,9 @@ #include "ttime.h" #define DEFAULT_FALSE_POSITIVE 0.01 -#define DEFAULT_BUCKET_SIZE 1310720 -#define DEFAULT_MAP_CAPACITY 1310720 -#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10) +#define DEFAULT_BUCKET_SIZE 131072 +#define DEFAULT_MAP_CAPACITY 131072 +#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100) #define ROWS_PER_MILLISECOND 1 #define MAX_NUM_SCALABLE_BF 100000 #define MIN_NUM_SCALABLE_BF 10 @@ -44,8 +44,8 @@ static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { } } -static void clearItemHelper(void* p) { - SScalableBf** pBf = p; +static void clearItemHelper(void *p) { + SScalableBf **pBf = p; tScalableBfDestroy(*pBf); } @@ -274,7 +274,7 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) { } int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) { - if(!pInfo) { + if (!pInfo) { return 0; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f531f65565..757b1de6b6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -16,12 +16,12 @@ #include "tstreamFileState.h" #include "query.h" +#include "storageapi.h" #include "streamBackendRocksdb.h" #include "taos.h" #include "tcommon.h" #include "thash.h" #include "tsimplehash.h" -#include "storageapi.h" #define FLUSH_RATIO 0.5 #define FLUSH_NUM 4 @@ -416,10 +416,13 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t len = 0; memcpy(buf, taskKey, strlen(taskKey)); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); - if (code != 0) { + if (code != 0 || len == 0 || val == NULL) { return TSDB_CODE_FAILED; } - sscanf(val, "%" PRId64 "", &maxCheckPointId); + memcpy(val, buf, len); + buf[len] = 0; + maxCheckPointId = atol((char*)buf); + taosMemoryFree(val); } for (int64_t i = maxCheckPointId; i > 0; i--) { char buf[128] = {0}; @@ -430,13 +433,16 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (code != 0) { return TSDB_CODE_FAILED; } + memcpy(val, buf, len); + buf[len] = 0; + taosMemoryFree(val); + TSKEY ts; - sscanf(val, "%" PRId64 "", &ts); + ts = atol((char*)buf); if (ts < mark) { // statekey winkey.ts < mark forceRemoveCheckpoint(pFileState, i); break; - } else { } } return code; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 74b0dddbd9..c56852b88c 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -564,7 +564,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py -#,,n,system-test,python3 ./test.py -f 0-others/compatibility.py +,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py ,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py ,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py diff --git a/tests/system-test/1-insert/manyVgroups.json b/tests/system-test/1-insert/manyVgroups.json index 3b0fa96b08..8c6f39cf96 100644 --- a/tests/system-test/1-insert/manyVgroups.json +++ b/tests/system-test/1-insert/manyVgroups.json @@ -11,7 +11,7 @@ "confirm_parameter_prompt": "no", "insert_interval": 0, "interlace_rows": 0, - "num_of_records_per_req": 100000, + "num_of_records_per_req": 10000, "databases": [ { "dbinfo": { @@ -73,4 +73,4 @@ ] } ] -} \ No newline at end of file +}