fix: handle insufficient resource
This commit is contained in:
parent
9fb5c8f3a1
commit
b147ba2812
|
@ -816,7 +816,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
// TODO version should be assigned and refed during preprocess
|
// TODO version should be assigned and refed during preprocess
|
||||||
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
||||||
if (pRef == NULL) {
|
if (pRef == NULL) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int64_t ver = pRef->refVer;
|
int64_t ver = pRef->refVer;
|
||||||
|
@ -837,12 +836,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
|
|
||||||
pHandle->execHandle.task =
|
pHandle->execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
|
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
|
||||||
ASSERT(pHandle->execHandle.task);
|
|
||||||
void* scanner = NULL;
|
void* scanner = NULL;
|
||||||
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
||||||
ASSERT(scanner);
|
|
||||||
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
ASSERT(pHandle->execHandle.pExecReader);
|
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
|
@ -875,8 +871,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
// TODO
|
return -1;
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
||||||
|
@ -886,8 +881,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
taosMemoryFree(req.qmsg);
|
taosMemoryFree(req.qmsg);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
// TODO
|
return -1;
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
// close handle
|
// close handle
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,17 +71,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
||||||
|
|
||||||
int32_t tqMetaOpen(STQ* pTq) {
|
int32_t tqMetaOpen(STQ* pTq) {
|
||||||
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
|
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
|
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
|
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,40 +194,49 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t vlen;
|
int32_t vlen;
|
||||||
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
|
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
|
||||||
pHandle->consumerId, TD_VID(pTq->pVnode));
|
pHandle->consumerId, TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, vlen);
|
void* buf = taosMemoryCalloc(1, vlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
ASSERT(0);
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, buf, vlen);
|
tEncoderInit(&encoder, buf, vlen);
|
||||||
|
|
||||||
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TXN* txn;
|
TXN* txn;
|
||||||
|
|
||||||
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
0) {
|
0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
|
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
|
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
|
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
|
||||||
ASSERT(0);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
Loading…
Reference in New Issue