diff --git a/example/src/tmq.c b/example/src/tmq.c index 976d658fa6..2ee91c254c 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) { int32_t numOfFields = taos_field_count(msg); taos_print_row(buf, row, fields, numOfFields); printf("%s\n", buf); + + const char* tbName = tmq_get_table_name(msg); + if (tbName) { + printf("from tb: %s\n", tbName); + } } } @@ -101,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/client/taos.h b/include/client/taos.h index 26d4d18234..486d5f5fef 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); -// TODO -#if 0 -DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); -#endif +DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); #if 0 DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6294e14adb..8eb9ae62d8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2492,6 +2492,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i); tlen += taosEncodeSSchemaWrapper(buf, pSW); } + if (pRsp->withTbName) { + char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i); + tlen += taosEncodeString(buf, tbName); + } } } return tlen; @@ -2504,6 +2508,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeFixedI32(buf, &pRsp->blockNum); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); + pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); if (pRsp->blockNum != 0) { buf = taosDecodeFixedI8(buf, &pRsp->withTbName); @@ -2522,6 +2527,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeSSchemaWrapper(buf, pSW); taosArrayPush(pRsp->blockSchema, &pSW); } + if (pRsp->withTbName) { + char* name = NULL; + buf = taosDecodeString(buf, &name); + taosArrayPush(pRsp->blockTbName, &name); + } } } return (void*)buf; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0ce689f19c..b42f072e54 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1346,3 +1346,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { return -1; } } + +const char* tmq_get_table_name(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= pRspObj->rsp.blockNum) { + return NULL; + } + const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + return name; + } + return NULL; +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 41d4d5f406..00379ecda1 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.subType = TOPIC_SUB_TYPE__TABLE; - topicObj.withTbName = 0; - topicObj.withSchema = 0; + topicObj.withTbName = pCreate->withTbName; + topicObj.withSchema = pCreate->withSchema; SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 1e271bc8d8..f1917d5fea 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -116,10 +116,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); // SMetaDB int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); -int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); -int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); -int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); -int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); +// int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); +int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); +int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); +int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); #endif #endif @@ -128,4 +128,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); } #endif -#endif /*_TD_VNODE_META_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_META_H_*/ diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 8d2a4ebcf3..7d0a87d79d 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -158,7 +158,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo skmDbKey.sver = sver; pKey = &skmDbKey; kLen = sizeof(skmDbKey); + metaRLock(pMeta); ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen); + metaULock(pMeta); if (ret < 0) { return NULL; } @@ -181,6 +183,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo } struct SMCtbCursor { + SMeta *pMeta; TDBC *pCur; tb_uid_t suid; void *pKey; @@ -200,9 +203,13 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { return NULL; } + pCtbCur->pMeta = pMeta; pCtbCur->suid = uid; + metaRLock(pMeta); + ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL); if (ret < 0) { + metaULock(pMeta); taosMemoryFree(pCtbCur); return NULL; } @@ -220,6 +227,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) { if (pCtbCur) { + if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta); if (pCtbCur->pCur) { tdbDbcClose(pCtbCur->pCur); @@ -269,7 +277,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { pSW = metaGetTableSchema(pMeta, quid, sver, 0); if (!pSW) return NULL; - + tdInitTSchemaBuilder(&sb, 0); for (int i = 0; i < pSW->nCols; i++) { pSchema = pSW->pSchema + i; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b49c148ac5..dac1caff69 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -427,9 +427,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; rsp.withSchema = pExec->withSchema; + rsp.withTbName = pExec->withTbName; + rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockSchema = taosArrayInit(0, sizeof(void*)); + rsp.blockTbName = taosArrayInit(0, sizeof(void*)); while (1) { consumerEpoch = atomic_load_32(&pExec->epoch); @@ -535,6 +538,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayPush(rsp.blockSchema, &pSW); } + if (pExec->withTbName) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + if (metaGetTableEntryByUid(&mr, uid) < 0) { + ASSERT(0); + } + char* tbName = strdup(mr.me.name); + taosArrayPush(rsp.blockTbName, &tbName); + metaReaderClear(&mr); + } + rsp.blockNum++; } // db subscribe @@ -563,6 +578,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ASSERT(actualLen <= dataStrLen); taosArrayPush(rsp.blockDataLen, &actualLen); taosArrayPush(rsp.blockData, &buf); + if (pExec->withTbName) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) { + ASSERT(0); + } + char* tbName = strdup(mr.me.name); + taosArrayPush(rsp.blockTbName, &tbName); + metaReaderClear(&mr); + } SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); taosArrayPush(rsp.blockSchema, &pSW); @@ -614,6 +639,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockDataLen); taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 031d200a66..07a68d780a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -485,8 +485,10 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { for (int i = 1; i < pCommith->niters; i++) { tSkipListDestroyIter(pCommith->iters[i].pIter); - tdFreeSchema(pCommith->iters[i].pTable->pSchema); - taosMemoryFree(pCommith->iters[i].pTable); + if (pCommith->iters[i].pTable) { + tdFreeSchema(pCommith->iters[i].pTable->pSchema); + taosMemoryFreeClear(pCommith->iters[i].pTable); + } } taosMemoryFree(pCommith->iters); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1260f9a3e7..f0f5338c4d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -72,6 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t ret = 0; SMsgCb *pMsgCb = rpcHandle; if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { + pMsg->noResp = 1; tmsgSendReq(rpcHandle, pEpSet, pMsg); } else { vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index aa05687426..8574e071f2 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -242,7 +242,7 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) { int h; h = tdbPCachePageHash(&(pPage->pgid)); - for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) + for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) ; ASSERT(*ppPage == pPage); *ppPage = pPage->pHashNext;