From 993ae8439890339b3e2eae353f8024a7c8f2f503 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 16 Aug 2023 07:17:15 +0000 Subject: [PATCH] close backend --- source/dnode/vnode/src/tq/tq.c | 348 +++++++++--------- source/dnode/vnode/src/tsdb/tsdbOpen.c | 3 + source/libs/stream/src/streamBackendRocksdb.c | 2 +- source/libs/stream/src/streamMeta.c | 4 +- source/libs/stream/src/streamState.c | 7 +- 5 files changed, 190 insertions(+), 174 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b75455de6c..3be7b45c68 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -141,6 +141,7 @@ int32_t tqInitialize(STQ* pTq) { } void tqClose(STQ* pTq) { + qDebug("start to close tq"); if (pTq == NULL) { return; } @@ -150,7 +151,7 @@ void tqClose(STQ* pTq) { STqHandle* pHandle = *(STqHandle**)pIter; int32_t vgId = TD_VID(pTq->pVnode); - if(pHandle->msg != NULL) { + if (pHandle->msg != NULL) { tqPushEmptyDataRsp(pHandle, vgId); rpcFreeCont(pHandle->msg->pCont); taosMemoryFree(pHandle->msg); @@ -166,6 +167,7 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); + qDebug("end to close tq"); taosMemoryFree(pTq); } @@ -229,54 +231,54 @@ void tqNotifyClose(STQ* pTq) { tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); } -//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, -// int64_t consumerId, int32_t type) { -// int32_t len = 0; -// int32_t code = 0; +// static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, +// int64_t consumerId, int32_t type) { +// int32_t len = 0; +// int32_t code = 0; // -// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { -// tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); -// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { -// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); -// } +// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { +// tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); +// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { +// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); +// } // -// if (code < 0) { -// return -1; -// } +// if (code < 0) { +// return -1; +// } // -// int32_t tlen = sizeof(SMqRspHead) + len; -// void* buf = rpcMallocCont(tlen); -// if (buf == NULL) { -// return -1; -// } +// int32_t tlen = sizeof(SMqRspHead) + len; +// void* buf = rpcMallocCont(tlen); +// if (buf == NULL) { +// return -1; +// } // -// ((SMqRspHead*)buf)->mqMsgType = type; -// ((SMqRspHead*)buf)->epoch = epoch; -// ((SMqRspHead*)buf)->consumerId = consumerId; +// ((SMqRspHead*)buf)->mqMsgType = type; +// ((SMqRspHead*)buf)->epoch = epoch; +// ((SMqRspHead*)buf)->consumerId = consumerId; // -// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); +// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); // -// SEncoder encoder = {0}; -// tEncoderInit(&encoder, abuf, len); +// SEncoder encoder = {0}; +// tEncoderInit(&encoder, abuf, len); // -// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { -// tEncodeMqDataRsp(&encoder, pRsp); -// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { -// tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); -// } +// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { +// tEncodeMqDataRsp(&encoder, pRsp); +// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { +// tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); +// } // -// tEncoderClear(&encoder); +// tEncoderClear(&encoder); // -// SRpcMsg rsp = { -// .info = *pRpcHandleInfo, -// .pCont = buf, -// .contLen = tlen, -// .code = 0, -// }; +// SRpcMsg rsp = { +// .info = *pRpcHandleInfo, +// .pCont = buf, +// .contLen = tlen, +// .code = 0, +// }; // -// tmsgSendRsp(&rsp); -// return 0; -//} +// tmsgSendRsp(&rsp); +// return 0; +// } int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { SMqPollReq req = {0}; @@ -292,32 +294,33 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { dataRsp.rspOffset = dataRsp.reqOffset; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset); - tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); + tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, + req.reqId); tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); tDeleteMqDataRsp(&dataRsp); return 0; } -//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { -// SMqDataRsp dataRsp = {0}; -// dataRsp.head.consumerId = pHandle->consumerId; -// dataRsp.head.epoch = pHandle->epoch; -// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; +// int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { +// SMqDataRsp dataRsp = {0}; +// dataRsp.head.consumerId = pHandle->consumerId; +// dataRsp.head.epoch = pHandle->epoch; +// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; // -// int64_t sver = 0, ever = 0; -// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); -// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, -// ever); +// int64_t sver = 0, ever = 0; +// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); +// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, +// ever); // -// char buf1[TSDB_OFFSET_LEN] = {0}; -// char buf2[TSDB_OFFSET_LEN] = {0}; -// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); -// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); -// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, -// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); -// return 0; -//} +// char buf1[TSDB_OFFSET_LEN] = {0}; +// char buf2[TSDB_OFFSET_LEN] = {0}; +// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); +// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); +// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, +// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); +// return 0; +// } int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId) { @@ -365,7 +368,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) { tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, - vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); + vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); return 0; // no need to update the offset value } @@ -378,10 +381,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { - SMqSeekReq req = {0}; - int32_t vgId = TD_VID(pTq->pVnode); - SRpcMsg rsp = {.info = pMsg->info}; - int code = 0; + SMqSeekReq req = {0}; + int32_t vgId = TD_VID(pTq->pVnode); + SRpcMsg rsp = {.info = pMsg->info}; + int code = 0; if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -407,8 +410,8 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { goto end; } - //if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to TMQ_VG_STATUS__IDLE, - //otherwise poll data failed after seek. + // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to + // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek. tqUnregisterPushHandle(pTq, pHandle); taosRUnLockLatch(&pTq->lock); @@ -417,85 +420,85 @@ end: tmsgSendRsp(&rsp); return 0; -// SMqVgOffset vgOffset = {0}; -// int32_t vgId = TD_VID(pTq->pVnode); -// -// SDecoder decoder; -// tDecoderInit(&decoder, (uint8_t*)msg, msgLen); -// if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { -// tqError("vgId:%d failed to decode seek msg", vgId); -// return -1; -// } -// -// tDecoderClear(&decoder); -// -// tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64, -// vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version); -// -// STqOffset* pOffset = &vgOffset.offset; -// if (pOffset->val.type != TMQ_OFFSET__LOG) { -// tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type); -// return -1; -// } -// -// STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); -// if (pHandle == NULL) { -// tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); -// terrno = TSDB_CODE_INVALID_MSG; -// return -1; -// } -// -// // 2. check consumer-vg assignment status -// taosRLockLatch(&pTq->lock); -// if (pHandle->consumerId != vgOffset.consumerId) { -// tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, -// vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId); -// terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; -// taosRUnLockLatch(&pTq->lock); -// return -1; -// } -// taosRUnLockLatch(&pTq->lock); -// -// // 3. check the offset info -// STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); -// if (pSavedOffset != NULL) { -// if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { -// tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey); -// return 0; // no need to update the offset value -// } -// -// if (pSavedOffset->val.version == pOffset->val.version) { -// tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, -// pOffset->val.version, pSavedOffset->val.version); -// return 0; -// } -// } -// -// int64_t sver = 0, ever = 0; -// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); -// if (pOffset->val.version < sver) { -// pOffset->val.version = sver; -// } else if (pOffset->val.version > ever) { -// pOffset->val.version = ever; -// } -// -// // save the new offset value -// if (pSavedOffset != NULL) { -// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, -// pSavedOffset->val.version); -// } else { -// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); -// } -// -// if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { -// tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version); -// return -1; -// } -// -// tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, -// vgOffset.consumerId, vgOffset.offset.val.version); -// -// return 0; + // SMqVgOffset vgOffset = {0}; + // int32_t vgId = TD_VID(pTq->pVnode); + // + // SDecoder decoder; + // tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + // if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { + // tqError("vgId:%d failed to decode seek msg", vgId); + // return -1; + // } + // + // tDecoderClear(&decoder); + // + // tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64, + // vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version); + // + // STqOffset* pOffset = &vgOffset.offset; + // if (pOffset->val.type != TMQ_OFFSET__LOG) { + // tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type); + // return -1; + // } + // + // STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); + // if (pHandle == NULL) { + // tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, + // pOffset->subKey); terrno = TSDB_CODE_INVALID_MSG; return -1; + // } + // + // // 2. check consumer-vg assignment status + // taosRLockLatch(&pTq->lock); + // if (pHandle->consumerId != vgOffset.consumerId) { + // tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" + // PRIx64, + // vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId); + // terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + // taosRUnLockLatch(&pTq->lock); + // return -1; + // } + // taosRUnLockLatch(&pTq->lock); + // + // // 3. check the offset info + // STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); + // if (pSavedOffset != NULL) { + // if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { + // tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey); + // return 0; // no need to update the offset value + // } + // + // if (pSavedOffset->val.version == pOffset->val.version) { + // tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, + // pOffset->val.version, pSavedOffset->val.version); + // return 0; + // } + // } + // + // int64_t sver = 0, ever = 0; + // walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + // if (pOffset->val.version < sver) { + // pOffset->val.version = sver; + // } else if (pOffset->val.version > ever) { + // pOffset->val.version = ever; + // } + // + // // save the new offset value + // if (pSavedOffset != NULL) { + // tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, + // pSavedOffset->val.version); + // } else { + // tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); + // } + // + // if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { + // tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version); + // return -1; + // } + // + // tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, + // vgOffset.consumerId, vgOffset.offset.val.version); + // + // return 0; } int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { @@ -577,10 +580,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // 1. find handle pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { - do{ - if (tqMetaGetHandle(pTq, req.subKey) == 0){ + do { + if (tqMetaGetHandle(pTq, req.subKey) == 0) { pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if(pHandle != NULL){ + if (pHandle != NULL) { break; } } @@ -588,7 +591,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { terrno = TSDB_CODE_INVALID_MSG; taosWUnLockLatch(&pTq->lock); return -1; - }while(0); + } while (0); } // 2. check re-balance status @@ -639,7 +642,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { - void* data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + void* data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); SMqVgOffset vgOffset = {0}; @@ -732,7 +735,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { if (reqOffset.type == TMQ_OFFSET__LOG) { dataRsp.rspOffset.version = reqOffset.version; - } else if(reqOffset.type < 0){ + } else if (reqOffset.type < 0) { STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); if (pOffset != NULL) { if (pOffset->val.type != TMQ_OFFSET__LOG) { @@ -743,14 +746,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { } dataRsp.rspOffset.version = pOffset->val.version; - tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); - }else{ + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId, + req.subKey, dataRsp.rspOffset.version); + } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { dataRsp.rspOffset.version = ever; } - tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey, + dataRsp.rspOffset.version); } } else { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, @@ -851,7 +856,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, - req.oldConsumerId, req.newConsumerId); + req.oldConsumerId, req.newConsumerId); STqHandle* pHandle = NULL; while (1) { @@ -883,7 +888,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pHandle->consumerId == req.newConsumerId) { // do nothing tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); } else { - tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); + tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, + req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); // atomic_add_fetch_32(&pHandle->epoch, 1); @@ -1031,7 +1037,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 - " child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms", + " child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); @@ -1213,7 +1219,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTaskEnablePause(pTask); } } else { - tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start); + tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->tsInfo.step1Start); } // we have to continue retrying to successfully execute the scan history task. @@ -1364,11 +1370,13 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); - tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); + tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, + req.downstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { - tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); + tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", + req.downstreamTaskId); return -1; } @@ -1442,7 +1450,8 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { } else { tqDebug( "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " - "completed msg", pTask->id.idStr, req.downstreamId); + "completed msg", + pTask->id.idStr, req.downstreamId); streamProcessScanHistoryFinishRsp(pTask); } @@ -1483,7 +1492,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqStartStreamTasks(pTq); return 0; - } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. + } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); return -1; @@ -1610,7 +1619,8 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } - if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && + pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { tqStartStreamTasks(pTq); @@ -1625,13 +1635,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); - int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); + int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); if (code != 0) { return code; } - SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); + SStreamTask* pHistoryTask = + streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask) { code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); } @@ -1652,7 +1663,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId); if (pTask == NULL) { -// tDeleteStreamDispatchReq(&req); + // tDeleteStreamDispatchReq(&req); return -1; } @@ -1687,7 +1698,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { } tDecoderClear(&decoder); - int32_t taskId = req.taskId; + int32_t taskId = req.taskId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; @@ -1697,7 +1708,6 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { taosFreeQitem(pMsg); return 0; } else { - tDeleteStreamDispatchReq(&req); } @@ -1729,7 +1739,7 @@ FAIL: pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); - SRpcMsg rsp = { .code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead}; + SRpcMsg rsp = {.code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead}; tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code)); tmsgSendRsp(&rsp); @@ -1837,7 +1847,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); - bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task + bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SStreamTaskNodeUpdateMsg req = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index c684ad5184..14aa2a84a9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -88,6 +88,9 @@ _err: int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { + STsdb *pdb = *pTsdb; + tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d", TD_VID(pdb->pVnode), pdb->path, pdb->keepCfg.days, + pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2); taosThreadRwlockWrlock(&(*pTsdb)->rwLock); tsdbMemTableDestroy((*pTsdb)->mem, true); (*pTsdb)->mem = NULL; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4d3d70dcd1..c3c2847a3f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1175,7 +1175,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t } int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); - taosAcquireRef(streamBackendId, pState->streamBackendRid); + // taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); taosThreadMutexLock(&handle->cfMutex); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c7fc7f72ea..6408a92195 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -124,7 +124,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); - qInfo("vgId:%d open stream meta successfully, latest checkpoint:%"PRId64", stage:%" PRId64, vgId, chkpId, stage); + qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, chkpId, stage); return pMeta; _err: @@ -142,6 +142,7 @@ _err: } void streamMetaClose(SStreamMeta* pMeta) { + qDebug("start to close stream meta"); tdbAbort(pMeta->db, pMeta->txn); tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pCheckpointDb); @@ -167,6 +168,7 @@ void streamMetaClose(SStreamMeta* pMeta) { taosArrayDestroy(pMeta->checkpointInUse); taosMemoryFree(pMeta); + qDebug("end to close stream meta"); } int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 5b42be182c..c0dd40bb4d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -128,7 +128,6 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz if (uniqueId == NULL) { int code = streamStateOpenBackend(pMeta->streamBackend, pState); if (code == -1) { - taosReleaseRef(streamBackendId, pState->streamBackendRid); taosThreadMutexUnlock(&pMeta->backendMutex); taosMemoryFree(pState); return NULL; @@ -729,7 +728,8 @@ void streamStateFreeVal(void* val) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB - qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId); + qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey, + key->groupId); return streamStateSessionPut_rocksdb(pState, key, value, vLen); #else SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -763,7 +763,8 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { #ifdef USE_ROCKSDB - qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId); + qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey, + key->groupId); return streamStateSessionDel_rocksdb(pState, key); #else SStateSessionKey sKey = {.key = *key, .opNum = pState->number};