diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ad3f8cc869..06ff6329e0 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -96,7 +96,8 @@ struct STQ { SHashObj* pStreamTasks; SVnode* pVnode; SWal* pWal; - TDB* pTdb; + TDB* pMetaStore; + TTB* pExecStore; }; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bd48ed9b4c..979db9169e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,6 +14,7 @@ */ #include "tq.h" +#include "tdbInt.h" int32_t tqInit() { int8_t old; @@ -46,6 +47,10 @@ void tqCleanUp() { } } +int tqExecKeyCompare(const void* pKey1, int32_t kLen1, const void* pKey2, int32_t kLen2) { + return strcmp(pKey1, pKey2); +} + STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); if (pTq == NULL) { @@ -55,9 +60,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; - if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) { - ASSERT(0); - } pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -65,6 +67,43 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (tdbOpen(path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { + ASSERT(0); + } + + if (tdbTbOpen("exec", -1, -1, tqExecKeyCompare, pTq->pMetaStore, &pTq->pExecStore) < 0) { + ASSERT(0); + } + + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + ASSERT(0); + } + + /*if (tdbBegin(pTq->pMetaStore, &txn) < 0) {*/ + /*ASSERT(0);*/ + /*}*/ + + TBC* pCur; + if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) { + ASSERT(0); + } + + void* pKey; + int kLen; + void* pVal; + int vLen; + + tdbTbcMoveToFirst(pCur); + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + // create, put into execsj + } + + if (tdbTxnClose(&txn) < 0) { + ASSERT(0); + } + return pTq; } @@ -74,7 +113,7 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->execs); taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pushMgr); - tdbClose(pTq->pTdb); + tdbClose(pTq->pMetaStore); taosMemoryFree(pTq); } // TODO @@ -91,7 +130,6 @@ int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) { if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1; if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1; - // TODO encode modified exec } tEndEncode(pEncoder); return pEncoder->pos; @@ -108,7 +146,6 @@ int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) { if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1; if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1; - // TODO decode modified exec } tEndDecode(pDecoder); return 0; @@ -556,6 +593,23 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey)); ASSERT(code == 0); + + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + tdbTbDelete(pTq->pExecStore, pReq->subKey, (int)strlen(pReq->subKey), &txn); + + if (tdbCommit(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + return 0; } @@ -604,6 +658,45 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); + + int32_t code; + int32_t vlen; + tEncodeSize(tEncodeSTqExec, pExec, vlen, code); + ASSERT(code == 0); + + void* buf = taosMemoryCalloc(1, vlen); + if (buf == NULL) { + ASSERT(0); + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, vlen); + + if (tEncodeSTqExec(&encoder, pExec) < 0) { + ASSERT(0); + } + + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbUpsert(pTq->pExecStore, req.subKey, (int)strlen(req.subKey), buf, vlen, &txn) < 0) { + ASSERT(0); + } + + if (tdbCommit(pTq->pMetaStore, &txn) < 0) { + ASSERT(0); + } + + tEncoderClear(&encoder); + taosMemoryFree(buf); + return 0; } else { /*if (req.newConsumerId != -1) {*/ diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index be8d786de2..9f4c5fc81e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -83,11 +83,11 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) { int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, int32_t* pNumOfRows, int16_t* pNumOfCols) { - /*int32_t sversion = pHandle->pBlock->sversion;*/ - // TODO set to real sversion *pUid = 0; - int32_t sversion = 1; + // TODO set to real sversion + /*int32_t sversion = 1;*/ + int32_t sversion = htonl(pHandle->pBlock->sversion); if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); if (pHandle->pSchema == NULL) {