Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format

This commit is contained in:
Hongze Cheng 2022-05-12 05:28:17 +00:00
commit 818ea3c79c
11 changed files with 79 additions and 17 deletions

View File

@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) {
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
printf("%s\n", buf); printf("%s\n", buf);
const char* tbName = tmq_get_table_name(msg);
if (tbName) {
printf("from tb: %s\n", tbName);
}
} }
} }
@ -101,8 +106,8 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;

View File

@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
#if 0
DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
#endif
#if 0 #if 0
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);

View File

@ -2492,6 +2492,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
tlen += taosEncodeSSchemaWrapper(buf, pSW); tlen += taosEncodeSSchemaWrapper(buf, pSW);
} }
if (pRsp->withTbName) {
char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i);
tlen += taosEncodeString(buf, tbName);
}
} }
} }
return tlen; return tlen;
@ -2504,6 +2508,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI32(buf, &pRsp->blockNum); buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) { if (pRsp->blockNum != 0) {
buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
@ -2522,6 +2527,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeSSchemaWrapper(buf, pSW); buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} }
if (pRsp->withTbName) {
char* name = NULL;
buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name);
}
} }
} }
return (void*)buf; return (void*)buf;

View File

@ -1346,3 +1346,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
return -1; return -1;
} }
} }
const char* tmq_get_table_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
return name;
}
return NULL;
}

View File

@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.ast = strdup(pCreate->ast); topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.astLen = strlen(pCreate->ast) + 1;
topicObj.subType = TOPIC_SUB_TYPE__TABLE; topicObj.subType = TOPIC_SUB_TYPE__TABLE;
topicObj.withTbName = 0; topicObj.withTbName = pCreate->withTbName;
topicObj.withSchema = 0; topicObj.withSchema = pCreate->withSchema;
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) { if (nodesStringToNode(pCreate->ast, &pAst) != 0) {

View File

@ -116,10 +116,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
// SMetaDB // SMetaDB
int metaOpenDB(SMeta* pMeta); int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); // int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
#endif #endif
#endif #endif
@ -128,4 +128,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
} }
#endif #endif
#endif /*_TD_VNODE_META_H_*/ #endif /*_TD_VNODE_META_H_*/

View File

@ -158,7 +158,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
skmDbKey.sver = sver; skmDbKey.sver = sver;
pKey = &skmDbKey; pKey = &skmDbKey;
kLen = sizeof(skmDbKey); kLen = sizeof(skmDbKey);
metaRLock(pMeta);
ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen); ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
metaULock(pMeta);
if (ret < 0) { if (ret < 0) {
return NULL; return NULL;
} }
@ -181,6 +183,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
} }
struct SMCtbCursor { struct SMCtbCursor {
SMeta *pMeta;
TDBC *pCur; TDBC *pCur;
tb_uid_t suid; tb_uid_t suid;
void *pKey; void *pKey;
@ -200,9 +203,13 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
return NULL; return NULL;
} }
pCtbCur->pMeta = pMeta;
pCtbCur->suid = uid; pCtbCur->suid = uid;
metaRLock(pMeta);
ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL); ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
if (ret < 0) { if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pCtbCur); taosMemoryFree(pCtbCur);
return NULL; return NULL;
} }
@ -220,6 +227,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) { void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
if (pCtbCur) { if (pCtbCur) {
if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) { if (pCtbCur->pCur) {
tdbDbcClose(pCtbCur->pCur); tdbDbcClose(pCtbCur->pCur);
@ -269,7 +277,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
pSW = metaGetTableSchema(pMeta, quid, sver, 0); pSW = metaGetTableSchema(pMeta, quid, sver, 0);
if (!pSW) return NULL; if (!pSW) return NULL;
tdInitTSchemaBuilder(&sb, 0); tdInitTSchemaBuilder(&sb, 0);
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchema + i;

View File

@ -427,9 +427,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp rsp = {0}; SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.withSchema = pExec->withSchema; rsp.withSchema = pExec->withSchema;
rsp.withTbName = pExec->withTbName;
rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*)); rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch); consumerEpoch = atomic_load_32(&pExec->epoch);
@ -535,6 +538,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush(rsp.blockSchema, &pSW); taosArrayPush(rsp.blockSchema, &pSW);
} }
if (pExec->withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0);
}
char* tbName = strdup(mr.me.name);
taosArrayPush(rsp.blockTbName, &tbName);
metaReaderClear(&mr);
}
rsp.blockNum++; rsp.blockNum++;
} }
// db subscribe // db subscribe
@ -563,6 +578,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen); taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf); taosArrayPush(rsp.blockData, &buf);
if (pExec->withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
ASSERT(0);
}
char* tbName = strdup(mr.me.name);
taosArrayPush(rsp.blockTbName, &tbName);
metaReaderClear(&mr);
}
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(rsp.blockSchema, &pSW); taosArrayPush(rsp.blockSchema, &pSW);
@ -614,6 +639,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen); taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
return 0; return 0;
} }

View File

@ -485,8 +485,10 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
for (int i = 1; i < pCommith->niters; i++) { for (int i = 1; i < pCommith->niters; i++) {
tSkipListDestroyIter(pCommith->iters[i].pIter); tSkipListDestroyIter(pCommith->iters[i].pIter);
tdFreeSchema(pCommith->iters[i].pTable->pSchema); if (pCommith->iters[i].pTable) {
taosMemoryFree(pCommith->iters[i].pTable); tdFreeSchema(pCommith->iters[i].pTable->pSchema);
taosMemoryFreeClear(pCommith->iters[i].pTable);
}
} }
taosMemoryFree(pCommith->iters); taosMemoryFree(pCommith->iters);

View File

@ -72,6 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t ret = 0; int32_t ret = 0;
SMsgCb *pMsgCb = rpcHandle; SMsgCb *pMsgCb = rpcHandle;
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
pMsg->noResp = 1;
tmsgSendReq(rpcHandle, pEpSet, pMsg); tmsgSendReq(rpcHandle, pEpSet, pMsg);
} else { } else {
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);

View File

@ -242,7 +242,7 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
int h; int h;
h = tdbPCachePageHash(&(pPage->pgid)); h = tdbPCachePageHash(&(pPage->pgid));
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
; ;
ASSERT(*ppPage == pPage); ASSERT(*ppPage == pPage);
*ppPage = pPage->pHashNext; *ppPage = pPage->pHashNext;