close backend

This commit is contained in:
yihaoDeng 2023-08-16 07:17:15 +00:00
parent 7815e44655
commit 993ae84398
5 changed files with 190 additions and 174 deletions

View File

@ -141,6 +141,7 @@ int32_t tqInitialize(STQ* pTq) {
} }
void tqClose(STQ* pTq) { void tqClose(STQ* pTq) {
qDebug("start to close tq");
if (pTq == NULL) { if (pTq == NULL) {
return; return;
} }
@ -150,7 +151,7 @@ void tqClose(STQ* pTq) {
STqHandle* pHandle = *(STqHandle**)pIter; STqHandle* pHandle = *(STqHandle**)pIter;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
if(pHandle->msg != NULL) { if (pHandle->msg != NULL) {
tqPushEmptyDataRsp(pHandle, vgId); tqPushEmptyDataRsp(pHandle, vgId);
rpcFreeCont(pHandle->msg->pCont); rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg); taosMemoryFree(pHandle->msg);
@ -166,6 +167,7 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq->path); taosMemoryFree(pTq->path);
tqMetaClose(pTq); tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta); streamMetaClose(pTq->pStreamMeta);
qDebug("end to close tq");
taosMemoryFree(pTq); taosMemoryFree(pTq);
} }
@ -229,54 +231,54 @@ void tqNotifyClose(STQ* pTq) {
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
} }
//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, // static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
// int64_t consumerId, int32_t type) { // int64_t consumerId, int32_t type) {
// int32_t len = 0; // int32_t len = 0;
// int32_t code = 0; // int32_t code = 0;
// //
// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { // if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
// tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); // tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { // } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); // tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
// } // }
// //
// if (code < 0) { // if (code < 0) {
// return -1; // return -1;
// } // }
// //
// int32_t tlen = sizeof(SMqRspHead) + len; // int32_t tlen = sizeof(SMqRspHead) + len;
// void* buf = rpcMallocCont(tlen); // void* buf = rpcMallocCont(tlen);
// if (buf == NULL) { // if (buf == NULL) {
// return -1; // return -1;
// } // }
// //
// ((SMqRspHead*)buf)->mqMsgType = type; // ((SMqRspHead*)buf)->mqMsgType = type;
// ((SMqRspHead*)buf)->epoch = epoch; // ((SMqRspHead*)buf)->epoch = epoch;
// ((SMqRspHead*)buf)->consumerId = consumerId; // ((SMqRspHead*)buf)->consumerId = consumerId;
// //
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); // void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
// //
// SEncoder encoder = {0}; // SEncoder encoder = {0};
// tEncoderInit(&encoder, abuf, len); // tEncoderInit(&encoder, abuf, len);
// //
// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { // if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
// tEncodeMqDataRsp(&encoder, pRsp); // tEncodeMqDataRsp(&encoder, pRsp);
// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { // } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
// tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); // tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
// } // }
// //
// tEncoderClear(&encoder); // tEncoderClear(&encoder);
// //
// SRpcMsg rsp = { // SRpcMsg rsp = {
// .info = *pRpcHandleInfo, // .info = *pRpcHandleInfo,
// .pCont = buf, // .pCont = buf,
// .contLen = tlen, // .contLen = tlen,
// .code = 0, // .code = 0,
// }; // };
// //
// tmsgSendRsp(&rsp); // tmsgSendRsp(&rsp);
// return 0; // return 0;
//} // }
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqPollReq req = {0}; SMqPollReq req = {0};
@ -292,32 +294,33 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
dataRsp.rspOffset = dataRsp.reqOffset; dataRsp.rspOffset = dataRsp.reqOffset;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf,
req.reqId);
tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
return 0; return 0;
} }
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { // int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
// SMqDataRsp dataRsp = {0}; // SMqDataRsp dataRsp = {0};
// dataRsp.head.consumerId = pHandle->consumerId; // dataRsp.head.consumerId = pHandle->consumerId;
// dataRsp.head.epoch = pHandle->epoch; // dataRsp.head.epoch = pHandle->epoch;
// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; // dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
// //
// int64_t sver = 0, ever = 0; // int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); // walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, // tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
// ever); // ever);
// //
// char buf1[TSDB_OFFSET_LEN] = {0}; // char buf1[TSDB_OFFSET_LEN] = {0};
// char buf2[TSDB_OFFSET_LEN] = {0}; // char buf2[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); // tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); // tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, // tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); // dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
// return 0; // return 0;
//} // }
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId) { int32_t type, int32_t vgId) {
@ -365,7 +368,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) { if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) {
tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
return 0; // no need to update the offset value return 0; // no need to update the offset value
} }
@ -378,10 +381,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
} }
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
SMqSeekReq req = {0}; SMqSeekReq req = {0};
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SRpcMsg rsp = {.info = pMsg->info}; SRpcMsg rsp = {.info = pMsg->info};
int code = 0; int code = 0;
if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -407,8 +410,8 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
goto end; goto end;
} }
//if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to TMQ_VG_STATUS__IDLE, // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
//otherwise poll data failed after seek. // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
tqUnregisterPushHandle(pTq, pHandle); tqUnregisterPushHandle(pTq, pHandle);
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
@ -417,85 +420,85 @@ end:
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
return 0; return 0;
// SMqVgOffset vgOffset = {0}; // SMqVgOffset vgOffset = {0};
// int32_t vgId = TD_VID(pTq->pVnode); // int32_t vgId = TD_VID(pTq->pVnode);
// //
// SDecoder decoder; // SDecoder decoder;
// tDecoderInit(&decoder, (uint8_t*)msg, msgLen); // tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
// if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { // if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
// tqError("vgId:%d failed to decode seek msg", vgId); // tqError("vgId:%d failed to decode seek msg", vgId);
// return -1; // return -1;
// } // }
// //
// tDecoderClear(&decoder); // tDecoderClear(&decoder);
// //
// tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64, // tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
// vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version); // vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);
// //
// STqOffset* pOffset = &vgOffset.offset; // STqOffset* pOffset = &vgOffset.offset;
// if (pOffset->val.type != TMQ_OFFSET__LOG) { // if (pOffset->val.type != TMQ_OFFSET__LOG) {
// tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type); // tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
// return -1; // return -1;
// } // }
// //
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); // STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
// if (pHandle == NULL) { // if (pHandle == NULL) {
// tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); // tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
// terrno = TSDB_CODE_INVALID_MSG; // pOffset->subKey); terrno = TSDB_CODE_INVALID_MSG; return -1;
// return -1; // }
// } //
// // // 2. check consumer-vg assignment status
// // 2. check consumer-vg assignment status // taosRLockLatch(&pTq->lock);
// taosRLockLatch(&pTq->lock); // if (pHandle->consumerId != vgOffset.consumerId) {
// if (pHandle->consumerId != vgOffset.consumerId) { // tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%"
// tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, // PRIx64,
// vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId); // vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
// terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; // terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
// taosRUnLockLatch(&pTq->lock); // taosRUnLockLatch(&pTq->lock);
// return -1; // return -1;
// } // }
// taosRUnLockLatch(&pTq->lock); // taosRUnLockLatch(&pTq->lock);
// //
// // 3. check the offset info // // 3. check the offset info
// STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); // STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
// if (pSavedOffset != NULL) { // if (pSavedOffset != NULL) {
// if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { // if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
// tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey); // tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
// return 0; // no need to update the offset value // return 0; // no need to update the offset value
// } // }
// //
// if (pSavedOffset->val.version == pOffset->val.version) { // if (pSavedOffset->val.version == pOffset->val.version) {
// tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, // tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
// pOffset->val.version, pSavedOffset->val.version); // pOffset->val.version, pSavedOffset->val.version);
// return 0; // return 0;
// } // }
// } // }
// //
// int64_t sver = 0, ever = 0; // int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); // walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// if (pOffset->val.version < sver) { // if (pOffset->val.version < sver) {
// pOffset->val.version = sver; // pOffset->val.version = sver;
// } else if (pOffset->val.version > ever) { // } else if (pOffset->val.version > ever) {
// pOffset->val.version = ever; // pOffset->val.version = ever;
// } // }
// //
// // save the new offset value // // save the new offset value
// if (pSavedOffset != NULL) { // if (pSavedOffset != NULL) {
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, // tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
// pSavedOffset->val.version); // pSavedOffset->val.version);
// } else { // } else {
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); // tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
// } // }
// //
// if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { // if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
// tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version); // tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
// return -1; // return -1;
// } // }
// //
// tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, // tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
// vgOffset.consumerId, vgOffset.offset.val.version); // vgOffset.consumerId, vgOffset.offset.val.version);
// //
// return 0; // return 0;
} }
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
@ -577,10 +580,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// 1. find handle // 1. find handle
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
do{ do {
if (tqMetaGetHandle(pTq, req.subKey) == 0){ if (tqMetaGetHandle(pTq, req.subKey) == 0) {
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if(pHandle != NULL){ if (pHandle != NULL) {
break; break;
} }
} }
@ -588,7 +591,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
return -1; return -1;
}while(0); } while (0);
} }
// 2. check re-balance status // 2. check re-balance status
@ -639,7 +642,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
void* data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); void* data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
SMqVgOffset vgOffset = {0}; SMqVgOffset vgOffset = {0};
@ -732,7 +735,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
if (reqOffset.type == TMQ_OFFSET__LOG) { if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = reqOffset.version; dataRsp.rspOffset.version = reqOffset.version;
} else if(reqOffset.type < 0){ } else if (reqOffset.type < 0) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
if (pOffset != NULL) { if (pOffset != NULL) {
if (pOffset->val.type != TMQ_OFFSET__LOG) { if (pOffset->val.type != TMQ_OFFSET__LOG) {
@ -743,14 +746,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
} }
dataRsp.rspOffset.version = pOffset->val.version; dataRsp.rspOffset.version = pOffset->val.version;
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
}else{ req.subKey, dataRsp.rspOffset.version);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever; dataRsp.rspOffset.version = ever;
} }
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
dataRsp.rspOffset.version);
} }
} else { } else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
@ -851,7 +856,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} }
tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId); req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = NULL; STqHandle* pHandle = NULL;
while (1) { while (1) {
@ -883,7 +888,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pHandle->consumerId == req.newConsumerId) { // do nothing if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
} else { } else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
// atomic_add_fetch_32(&pHandle->epoch, 1); // atomic_add_fetch_32(&pHandle->epoch, 1);
@ -1031,7 +1037,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms", " child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
@ -1213,7 +1219,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamTaskEnablePause(pTask); streamTaskEnablePause(pTask);
} }
} else { } else {
tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start); tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->tsInfo.step1Start);
} }
// we have to continue retrying to successfully execute the scan history task. // we have to continue retrying to successfully execute the scan history task.
@ -1364,11 +1370,13 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId,
req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed",
req.downstreamTaskId);
return -1; return -1;
} }
@ -1442,7 +1450,8 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
} else { } else {
tqDebug( tqDebug(
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg", pTask->id.idStr, req.downstreamId); "completed msg",
pTask->id.idStr, req.downstreamId);
streamProcessScanHistoryFinishRsp(pTask); streamProcessScanHistoryFinishRsp(pTask);
} }
@ -1483,7 +1492,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
return 0; return 0;
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this // todo add one function to handle this
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId);
return -1; return -1;
@ -1610,7 +1619,8 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} }
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory &&
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated); streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
@ -1625,13 +1635,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
if (code != 0) { if (code != 0) {
return code; return code;
} }
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); SStreamTask* pHistoryTask =
streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask) { if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
} }
@ -1652,7 +1663,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
if (pTask == NULL) { if (pTask == NULL) {
// tDeleteStreamDispatchReq(&req); // tDeleteStreamDispatchReq(&req);
return -1; return -1;
} }
@ -1687,7 +1698,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
if (pTask != NULL) { if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
@ -1697,7 +1708,6 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
return 0; return 0;
} else { } else {
tDeleteStreamDispatchReq(&req); tDeleteStreamDispatchReq(&req);
} }
@ -1729,7 +1739,7 @@ FAIL:
pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
SRpcMsg rsp = { .code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead}; SRpcMsg rsp = {.code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code)); tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code));
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
@ -1837,7 +1847,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SStreamTaskNodeUpdateMsg req = {0}; SStreamTaskNodeUpdateMsg req = {0};

View File

@ -88,6 +88,9 @@ _err:
int tsdbClose(STsdb **pTsdb) { int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) { if (*pTsdb) {
STsdb *pdb = *pTsdb;
tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d", TD_VID(pdb->pVnode), pdb->path, pdb->keepCfg.days,
pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2);
taosThreadRwlockWrlock(&(*pTsdb)->rwLock); taosThreadRwlockWrlock(&(*pTsdb)->rwLock);
tsdbMemTableDestroy((*pTsdb)->mem, true); tsdbMemTableDestroy((*pTsdb)->mem, true);
(*pTsdb)->mem = NULL; (*pTsdb)->mem = NULL;

View File

@ -1175,7 +1175,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
} }
int streamStateOpenBackend(void* backend, SStreamState* pState) { int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid); // taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendWrapper* handle = backend; SBackendWrapper* handle = backend;
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
taosThreadMutexLock(&handle->cfMutex); taosThreadMutexLock(&handle->cfMutex);

View File

@ -124,7 +124,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->lock); taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL); taosThreadMutexInit(&pMeta->backendMutex, NULL);
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%"PRId64", stage:%" PRId64, vgId, chkpId, stage); qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, chkpId, stage);
return pMeta; return pMeta;
_err: _err:
@ -142,6 +142,7 @@ _err:
} }
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
qDebug("start to close stream meta");
tdbAbort(pMeta->db, pMeta->txn); tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pCheckpointDb); tdbTbClose(pMeta->pCheckpointDb);
@ -167,6 +168,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosArrayDestroy(pMeta->checkpointInUse); taosArrayDestroy(pMeta->checkpointInUse);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
qDebug("end to close stream meta");
} }
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {

View File

@ -128,7 +128,6 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
if (uniqueId == NULL) { if (uniqueId == NULL) {
int code = streamStateOpenBackend(pMeta->streamBackend, pState); int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) { if (code == -1) {
taosReleaseRef(streamBackendId, pState->streamBackendRid);
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
taosMemoryFree(pState); taosMemoryFree(pState);
return NULL; return NULL;
@ -729,7 +728,8 @@ void streamStateFreeVal(void* val) {
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId); qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return streamStateSessionPut_rocksdb(pState, key, value, vLen); return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else #else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
@ -763,7 +763,8 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId); qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return streamStateSessionDel_rocksdb(pState, key); return streamStateSessionDel_rocksdb(pState, key);
#else #else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};