enh(tmq): support restart
This commit is contained in:
parent
823d47672d
commit
a9c54ae701
|
@ -51,6 +51,47 @@ int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_
|
|||
return strcmp(pKey1, pKey2);
|
||||
}
|
||||
|
||||
int32_t tqStoreExec(STQ* pTq, const char* key, const STqExec* pExec) {
|
||||
int32_t code;
|
||||
int32_t vlen;
|
||||
tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
|
||||
ASSERT(code == 0);
|
||||
|
||||
void* buf = taosMemoryCalloc(1, vlen);
|
||||
if (buf == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, vlen);
|
||||
|
||||
if (tEncodeSTqExec(&encoder, pExec) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
TXN txn;
|
||||
|
||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(buf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
||||
if (pTq == NULL) {
|
||||
|
@ -96,8 +137,31 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
|||
int vLen;
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
SDecoder decoder;
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
// create, put into execsj
|
||||
STqExec exec;
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
tDecodeSTqExec(&decoder, &exec);
|
||||
exec.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||
if (exec.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
|
||||
SReadHandle handle = {
|
||||
.reader = exec.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
};
|
||||
exec.task[i] = qCreateStreamExecTaskInfo(exec.qmsg, &handle);
|
||||
ASSERT(exec.task[i]);
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
exec.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
}
|
||||
exec.pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
}
|
||||
taosHashPut(pTq->execs, pKey, kLen, &exec, sizeof(STqExec));
|
||||
}
|
||||
|
||||
if (tdbTxnClose(&txn) < 0) {
|
||||
|
@ -604,7 +668,9 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn);
|
||||
if (tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
|
@ -659,60 +725,21 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
}
|
||||
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
||||
|
||||
int32_t code;
|
||||
int32_t vlen;
|
||||
tEncodeSize(tEncodeSTqExec, pExec, vlen, code);
|
||||
ASSERT(code == 0);
|
||||
|
||||
void* buf = taosMemoryCalloc(1, vlen);
|
||||
if (buf == NULL) {
|
||||
ASSERT(0);
|
||||
if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, vlen);
|
||||
|
||||
if (tEncodeSTqExec(&encoder, pExec) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
TXN txn;
|
||||
|
||||
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbBegin(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbTbUpsert(pTq->pExecStore, req.subKey, (int)strlen(req.subKey), buf, vlen, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaStore, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(buf);
|
||||
|
||||
return 0;
|
||||
} else {
|
||||
/*if (req.newConsumerId != -1) {*/
|
||||
/*taosWLockLatch(&pExec->lock);*/
|
||||
ASSERT(pExec->consumerId == req.oldConsumerId);
|
||||
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
||||
// TODO handle qmsg and exec modification
|
||||
atomic_store_32(&pExec->epoch, -1);
|
||||
atomic_store_64(&pExec->consumerId, req.newConsumerId);
|
||||
atomic_add_fetch_32(&pExec->epoch, 1);
|
||||
/*taosWUnLockLatch(&pExec->lock);*/
|
||||
|
||||
if (tqStoreExec(pTq, req.subKey, pExec) < 0) {
|
||||
// TODO
|
||||
}
|
||||
return 0;
|
||||
/*} else {*/
|
||||
// TODO
|
||||
/*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/
|
||||
/*return 0;*/
|
||||
/*}*/
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue