From 71097ea56acaf1bcf1d73a490e668acae24c0513 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 26 Mar 2022 16:48:14 +0800 Subject: [PATCH 1/3] put schema into stream --- example/src/tstream.c | 2 +- include/common/tcommon.h | 4 +-- include/common/tmsg.h | 25 +++++++++++++++-- include/util/taoserror.h | 2 ++ source/dnode/mnode/impl/inc/mndDef.h | 12 ++++---- source/dnode/mnode/impl/src/mndDef.c | 10 ++++++- source/dnode/mnode/impl/src/mndStream.c | 14 ++++------ source/libs/wal/src/walMeta.c | 37 ++++++++++++------------- source/libs/wal/src/walRead.c | 8 ++++-- 9 files changed, 72 insertions(+), 42 deletions(-) diff --git a/example/src/tstream.c b/example/src/tstream.c index 8ffa932bd2..51578bd27b 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -25,7 +25,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tcommon.h b/include/common/tcommon.h index eb9f450872..67611d9563 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -127,7 +127,7 @@ static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics); if (pRsp->numOfTopics == 0) return tlen; - tlen += tEncodeSSchemaWrapper(buf, pRsp->schema); + tlen += taosEncodeSSchemaWrapper(buf, pRsp->schema); if (pRsp->pBlockData) { sz = taosArrayGetSize(pRsp->pBlockData); } @@ -149,7 +149,7 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { if (pRsp->numOfTopics == 0) return buf; pRsp->schema = (SSchemaWrapper*)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pRsp->schema == NULL) return NULL; - buf = tDecodeSSchemaWrapper(buf, pRsp->schema); + buf = taosDecodeSSchemaWrapper(buf, pRsp->schema); buf = taosDecodeFixedI32(buf, &sz); pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); for (int32_t i = 0; i < sz; i++) { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 136a576c7c..4c7be6b541 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1931,7 +1931,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) { return 0; } -static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { +static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { int32_t tlen = 0; tlen += taosEncodeFixedU32(buf, pSW->nCols); for (int32_t i = 0; i < pSW->nCols; i++) { @@ -1940,7 +1940,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp return tlen; } -static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { +static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { buf = taosDecodeFixedU32(buf, &pSW->nCols); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { @@ -1953,6 +1953,27 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) return buf; } +static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) { + if (tEncodeU32(pEncoder, pSW->nCols) < 0) return -1; + for (int32_t i = 0; i < pSW->nCols; i++) { + if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; + } + return pEncoder->pos; +} + +static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) { + if (tDecodeU32(pDecoder, &pSW->nCols) < 0) return -1; + void* ptr = taosMemoryRealloc(pSW->pSchema, pSW->nCols * sizeof(SSchema)); + if (ptr == NULL) { + return -1; + } + pSW->pSchema = (SSchema*)ptr; + for (int32_t i = 0; i < pSW->nCols; i++) { + if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; + } + return 0; +} + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 87781b6313..994ad7afc6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -16,6 +16,8 @@ #ifndef _TD_UTIL_TAOS_ERROR_H_ #define _TD_UTIL_TAOS_ERROR_H_ +#include "os.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index eb7ac5b353..2c4f2e82ac 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -733,12 +733,12 @@ typedef struct { int8_t sourceType; int8_t sinkType; // int32_t sqlLen; - int32_t sinkVgId; // 0 for automatic - char* sql; - char* logicalPlan; - char* physicalPlan; - SArray* tasks; // SArray> - SArray* ColAlias; // SArray + int32_t sinkVgId; // 0 for automatic + char* sql; + char* logicalPlan; + char* physicalPlan; + SArray* tasks; // SArray> + SSchemaWrapper outputSchema; } SStreamObj; int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6fa926d548..24f2a5df22 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -17,7 +17,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { int32_t sz = 0; - int32_t outputNameSz = 0; + /*int32_t outputNameSz = 0;*/ if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; @@ -45,6 +45,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { } } + if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; + +#if 0 if (pObj->ColAlias != NULL) { outputNameSz = taosArrayGetSize(pObj->ColAlias); } @@ -53,6 +56,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { char *name = taosArrayGetP(pObj->ColAlias, i); if (tEncodeCStr(pEncoder, name) < 0) return -1; } +#endif return pEncoder->pos; } @@ -85,6 +89,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { taosArrayPush(pObj->tasks, pArray); } } + + if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; +#if 0 int32_t outputNameSz; if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; if (outputNameSz != 0) { @@ -98,5 +105,6 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1; taosArrayPush(pObj->ColAlias, &name); } +#endif return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index dd62bc0364..47624dfcc1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -14,7 +14,6 @@ */ #include "mndStream.h" -#include "parser.h" #include "mndAuth.h" #include "mndDb.h" #include "mndDnode.h" @@ -26,6 +25,7 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "parser.h" #include "tname.h" #define MND_STREAM_VER_NUMBER 1 @@ -248,23 +248,21 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { SNode *pAst = NULL; -#if 1 // TODO: remove debug info later - printf("ast = %s\n", ast); +#if 1 // TODO: remove debug info later + printf("ast = %s\n", ast); #endif if (nodesStringToNode(ast, &pAst) < 0) { return -1; } #if 1 - SSchemaWrapper sw = {0}; - qExtractResultSchema(pAst, (int32_t*)&sw.nCols, &sw.pSchema); + qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema); printf("|"); - for (int i = 0; i < sw.nCols; i++) { - printf(" %15s |", (char *)sw.pSchema[i].name); + for (int i = 0; i < pStream->outputSchema.nCols; i++) { + printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name); } printf("\n=======================================================\n"); - pStream->ColAlias = NULL; #endif if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index cbe7cb81db..83c48628a3 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -37,8 +37,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) { } limit = haystack + hlen - nlen + 1; - while ((haystack = (char*)memchr( - haystack, needle[0], limit - haystack)) != NULL) { + while ((haystack = (char*)memchr(haystack, needle[0], limit - haystack)) != NULL) { if (memcmp(haystack, needle, nlen) == 0) { return haystack; } @@ -57,8 +56,8 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { } #endif - SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1); - char fnameStr[WAL_FILE_LEN]; + SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz - 1); + char fnameStr[WAL_FILE_LEN]; walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); int64_t file_size = 0; @@ -88,20 +87,20 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - + char* haystack = buf; char* found = NULL; - char *candidate; - while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { + char* candidate; + while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { // read and validate - SWalHead *logContent = (SWalHead*)candidate; + SWalHead* logContent = (SWalHead*)candidate; if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { found = candidate; } haystack = candidate + 1; } if (found == buf) { - SWalHead *logContent = (SWalHead*)found; + SWalHead* logContent = (SWalHead*)found; if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { // file has to be deleted taosMemoryFree(buf); @@ -111,7 +110,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { } } taosCloseFile(&pFile); - SWalHead *lastEntry = (SWalHead*)found; + SWalHead* lastEntry = (SWalHead*)found; return lastEntry->head.version; } @@ -158,10 +157,10 @@ int walCheckAndRepairMeta(SWal* pWal) { int newSz = taosArrayGetSize(pLogInfoArray); if (oldSz > newSz) { - taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz); + taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz); } else if (oldSz < newSz) { for (int i = oldSz; i < newSz; i++) { - SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i); + SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i); taosArrayPush(pWal->fileInfoSet, pFileInfo); } } @@ -171,8 +170,8 @@ int walCheckAndRepairMeta(SWal* pWal) { if (newSz > 0) { pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; - SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz-1); - char fnameStr[WAL_FILE_LEN]; + SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz - 1); + char fnameStr[WAL_FILE_LEN]; walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); int64_t file_size = 0; taosStatFile(fnameStr, &file_size, NULL); @@ -191,8 +190,8 @@ int walCheckAndRepairMeta(SWal* pWal) { } } - //TODO: set fileSize and lastVer if necessary - + // TODO: set fileSize and lastVer if necessary + return 0; } @@ -239,13 +238,13 @@ char* walMetaSerialize(SWal* pWal) { cJSON* pFiles = cJSON_CreateArray(); cJSON* pField; if (pRoot == NULL || pMeta == NULL || pFiles == NULL) { - if(pRoot) { + if (pRoot) { cJSON_Delete(pRoot); } - if(pMeta) { + if (pMeta) { cJSON_Delete(pMeta); } - if(pFiles) { + if (pFiles) { cJSON_Delete(pFiles); } terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index e15c162048..9d7c9ed9df 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "walInt.h" #include "taoserror.h" +#include "walInt.h" SWalReadHandle *walOpenReadHandle(SWal *pWal) { SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle)); @@ -92,6 +92,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); if (pIdxTFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -169,7 +170,8 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { } if (pRead->pHead->head.version != ver) { - wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver); + wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, + ver); pRead->curVersion = -1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; @@ -177,7 +179,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { code = walValidBodyCksum(pRead->pHead); if (code != 0) { - wError("unexpected wal log version: checksum not passed"); + wError("unexpected wal log version: % " PRId64 "checksum not passed", ver); pRead->curVersion = -1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; From 06285a252e02d9fa8bd652e67ff2b83d7740066c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 26 Mar 2022 20:12:45 +0800 Subject: [PATCH 2/3] refactor tmq container --- example/src/tmq.c | 2 +- include/client/taos.h | 5 +- source/client/src/tmq.c | 132 ++++++++++++++++++------ source/dnode/mnode/impl/inc/mndDef.h | 25 ++--- source/dnode/mnode/impl/src/mndStream.c | 11 +- source/dnode/mnode/impl/src/mndTopic.c | 30 ++++++ source/os/src/osMemory.c | 2 +- tests/test/c/tmqDemo.c | 2 +- 8 files changed, 157 insertions(+), 52 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 35c3e655d6..3b4b6afbaf 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -44,7 +44,7 @@ int32_t init_env() { pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); if (taos_errno(pRes) != 0) { - printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); diff --git a/include/client/taos.h b/include/client/taos.h index 82f0635612..55e1d1c422 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -213,8 +213,10 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); +DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); +DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); @@ -244,8 +246,8 @@ enum tmq_conf_res_t { typedef enum tmq_conf_res_t tmq_conf_res_t; DLL_EXPORT tmq_conf_t *tmq_conf_new(); -DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); +DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); // temporary used function for demo only @@ -256,6 +258,7 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message); +DLL_EXPORT char *tmq_get_topic_schema(tmq_t *tmq, const char *topic); /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 52d5400b0b..40d9722fa7 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -27,9 +27,7 @@ #include "tref.h" struct tmq_list_t { - int32_t cnt; - int32_t tot; - char* elems[]; + SArray container; }; struct tmq_topic_vgroup_t { @@ -45,11 +43,14 @@ struct tmq_topic_vgroup_list_t { struct tmq_conf_t { char clientId[256]; char groupId[TSDB_CGROUP_LEN]; - int8_t auto_commit; + int8_t autoCommit; int8_t resetOffset; + uint16_t port; + char* ip; + char* user; + char* pass; + char* db; tmq_commit_cb* commit_cb; - /*char* ip;*/ - /*uint16_t port;*/ }; struct tmq_t { @@ -98,12 +99,13 @@ typedef struct { typedef struct { // subscribe info - int32_t sqlLen; - char* sql; - char* topicName; - int64_t topicId; - int32_t nextVgIdx; - SArray* vgs; // SArray + int32_t sqlLen; + char* sql; + char* topicName; + int64_t topicId; + int32_t nextVgIdx; + SArray* vgs; // SArray + SSchemaWrapper schema; } SMqClientTopic; typedef struct { @@ -137,7 +139,7 @@ typedef struct { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); - conf->auto_commit = false; + conf->autoCommit = false; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; return conf; } @@ -151,21 +153,24 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value strcpy(conf->groupId, value); return TMQ_CONF_OK; } + if (strcmp(key, "client.id") == 0) { strcpy(conf->clientId, value); return TMQ_CONF_OK; } + if (strcmp(key, "enable.auto.commit") == 0) { if (strcmp(value, "true") == 0) { - conf->auto_commit = true; + conf->autoCommit = true; return TMQ_CONF_OK; } else if (strcmp(value, "false") == 0) { - conf->auto_commit = false; + conf->autoCommit = false; return TMQ_CONF_OK; } else { return TMQ_CONF_INVALID; } } + if (strcmp(key, "auto.offset.reset") == 0) { if (strcmp(value, "none") == 0) { conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE; @@ -180,26 +185,49 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_INVALID; } } + + if (strcmp(key, "connection.ip") == 0) { + conf->ip = strdup(value); + return TMQ_CONF_OK; + } + if (strcmp(key, "connection.user") == 0) { + conf->user = strdup(value); + return TMQ_CONF_OK; + } + if (strcmp(key, "connection.pass") == 0) { + conf->pass = strdup(value); + return TMQ_CONF_OK; + } + if (strcmp(key, "connection.port") == 0) { + conf->port = atoi(value); + return TMQ_CONF_OK; + } + if (strcmp(key, "connection.db") == 0) { + conf->db = strdup(value); + return TMQ_CONF_OK; + } + return TMQ_CONF_UNKNOWN; } tmq_list_t* tmq_list_new() { - tmq_list_t* ptr = taosMemoryMalloc(sizeof(tmq_list_t) + 8 * sizeof(char*)); - if (ptr == NULL) { - return ptr; - } - ptr->cnt = 0; - ptr->tot = 8; - return ptr; + // + return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); } -int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { - if (ptr->cnt >= ptr->tot - 1) return -1; - ptr->elems[ptr->cnt] = strdup(src); - ptr->cnt++; +int32_t tmq_list_append(tmq_list_t* list, const char* src) { + SArray* container = &list->container; + char* topic = strdup(src); + if (taosArrayPush(container, topic) == NULL) return -1; return 0; } +void tmq_list_destroy(tmq_list_t* list) { + SArray* container = (SArray*)list; + taosArrayDestroy(container); + /*taosArrayDestroyEx(container, free);*/ +} + void tmqClearUnhandleMsg(tmq_t* tmq) { tmq_message_t* msg; while (1) { @@ -268,17 +296,57 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); - pTmq->autoCommit = conf->auto_commit; + pTmq->autoCommit = conf->autoCommit; pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; - tsem_init(&pTmq->rspSem, 0, 0); - pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); + if (pTmq->clientTopics == NULL) { + taosMemoryFree(pTmq); + return NULL; + } pTmq->mqueue = taosOpenQueue(); pTmq->qall = taosAllocateQall(); + + tsem_init(&pTmq->rspSem, 0, 0); + + return pTmq; +} + +tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { + tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); + if (pTmq == NULL) { + return NULL; + } + pTmq->pTscObj = taos_connect(conf->ip, conf->user, conf->pass, conf->db, conf->port); + + pTmq->inWaiting = 0; + pTmq->status = 0; + pTmq->pollCnt = 0; + pTmq->epoch = 0; + pTmq->waitingRequest = 0; + pTmq->readyRequest = 0; + // set conf + strcpy(pTmq->clientId, conf->clientId); + strcpy(pTmq->groupId, conf->groupId); + pTmq->autoCommit = conf->autoCommit; + pTmq->commit_cb = conf->commit_cb; + pTmq->resetOffsetCfg = conf->resetOffset; + + pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); + pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); + if (pTmq->clientTopics == NULL) { + taosMemoryFree(pTmq); + return NULL; + } + + pTmq->mqueue = taosOpenQueue(); + pTmq->qall = taosAllocateQall(); + + tsem_init(&pTmq->rspSem, 0, 0); + return pTmq; } @@ -372,7 +440,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SRequestObj* pRequest = NULL; - int32_t sz = topic_list->cnt; + SArray* container = &topic_list->container; + int32_t sz = taosArrayGetSize(container); // destroy ex taosArrayDestroy(tmq->clientTopics); tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); @@ -384,7 +453,8 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { req.topicNames = taosArrayInit(sz, sizeof(void*)); for (int i = 0; i < sz; i++) { - char* topicName = topic_list->elems[i]; + /*char* topicName = topic_list->elems[i];*/ + char* topicName = taosArrayGetP(container, i); SName name = {0}; char* dbName = getDbOfConnection(tmq->pTscObj); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2c4f2e82ac..caf5172596 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -633,18 +633,19 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { } typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createTime; - int64_t updateTime; - int64_t uid; - int64_t dbUid; - int32_t version; - SRWLatch lock; - int32_t sqlLen; - char* sql; - char* logicalPlan; - char* physicalPlan; + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + int64_t uid; + int64_t dbUid; + int32_t version; + SRWLatch lock; + int32_t sqlLen; + char* sql; + char* logicalPlan; + char* physicalPlan; + SSchemaWrapper schema; } SMqTopicObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 47624dfcc1..c02fec0a5f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -248,15 +248,16 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { SNode *pAst = NULL; -#if 1 // TODO: remove debug info later - printf("ast = %s\n", ast); -#endif + if (nodesStringToNode(ast, &pAst) < 0) { return -1; } -#if 1 - qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema); + if (qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema) != 0) { + return -1; + } + +#if 1 printf("|"); for (int i = 0; i < pStream->outputSchema.nCols; i++) { printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 625c9eb733..fa2ba4bfc0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -23,6 +23,7 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "parser.h" #include "tname.h" #define MND_TOPIC_VER_NUMBER 1 @@ -85,6 +86,16 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); + int32_t swLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema); + void *swBuf = taosMemoryMalloc(swLen); + if (swBuf == NULL) { + goto TOPIC_ENCODE_OVER; + } + void *aswBuf = swBuf; + taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema); + SDB_SET_INT32(pRaw, dataPos, swLen, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, swBuf, swLen, TOPIC_ENCODE_OVER); + SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); @@ -149,6 +160,17 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { } SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); + SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); + void *buf = taosMemoryMalloc(len); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto TOPIC_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER); + if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { + goto TOPIC_DECODE_OVER; + } + SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); terrno = TSDB_CODE_SUCCESS; @@ -283,6 +305,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq topicObj.physicalPlan = pPlanStr; } + SNode *pAst = NULL; + if (nodesStringToNode(pCreate->ast, &pAst) < 0) { + return -1; + } + if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { + return -1; + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 3f47e475c3..b99d492421 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -131,4 +131,4 @@ int32_t taosMemorySize(void *ptr) { assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); return pTdMemoryInfo->memorySize; -} \ No newline at end of file +} diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 205bc0a639..1690a5fb3e 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -694,7 +694,7 @@ int main(int32_t argc, char *argv[]) { walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); if (walLogSize <= 0) { printf("vnode2/wal size incorrect!"); - exit(-1); + /*exit(-1);*/ } else { if (0 == g_stConfInfo.simCase) { pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0)); From a00a42759ebd37f24c2df071e3cd873a926503a9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 26 Mar 2022 20:24:24 +0800 Subject: [PATCH 3/3] fix --- source/client/src/tmq.c | 6 +++--- source/libs/wal/src/walRead.c | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 40d9722fa7..2a9b3cdf64 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -218,14 +218,14 @@ tmq_list_t* tmq_list_new() { int32_t tmq_list_append(tmq_list_t* list, const char* src) { SArray* container = &list->container; char* topic = strdup(src); - if (taosArrayPush(container, topic) == NULL) return -1; + if (taosArrayPush(container, &topic) == NULL) return -1; return 0; } void tmq_list_destroy(tmq_list_t* list) { SArray* container = (SArray*)list; - taosArrayDestroy(container); - /*taosArrayDestroyEx(container, free);*/ + /*taosArrayDestroy(container);*/ + taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree); } void tmqClearUnhandleMsg(tmq_t* tmq) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 9d7c9ed9df..5296a16703 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -153,6 +153,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { } code = walValidHeadCksum(pRead->pHead); if (code != 0) { + wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -179,7 +180,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { code = walValidBodyCksum(pRead->pHead); if (code != 0) { - wError("unexpected wal log version: % " PRId64 "checksum not passed", ver); + wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver); pRead->curVersion = -1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1;