diff --git a/example/src/tmq.c b/example/src/tmq.c index 094fd94bfc..35c3e655d6 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -160,7 +160,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000); if (tmqmessage) { msg_process(tmqmessage); tmq_message_destroy(tmqmessage); diff --git a/include/client/taos.h b/include/client/taos.h index 2c8135c8ff..8b1517c6ff 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -224,10 +224,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); -#if 0 -DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); -DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics); -#endif +DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); +DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); #if 0 diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 37d20cdb97..a04e2afc94 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -49,6 +49,11 @@ enum { TMQ_CONF__RESET_OFFSET__NONE = -3, }; +enum { + TMQ_MSG_TYPE__POLL_RSP = 0, + TMQ_MSG_TYPE__EP_RSP, +}; + typedef struct { uint32_t numOfTables; SArray* pGroupList; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e7b2dfed89..ab0472a575 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1382,8 +1382,6 @@ typedef struct SMqCMGetSubEpReq { char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqCMGetSubEpReq; -#pragma pack(pop) - static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pMsg->contLen); @@ -1853,6 +1851,12 @@ typedef struct { SMqTbData* tbData; } SMqTopicData; +typedef struct { + int8_t mqMsgType; + int32_t code; + int32_t epoch; +} SMqRspHead; + typedef struct { int64_t consumerId; SSchemaWrapper* schemas; @@ -1869,6 +1873,7 @@ typedef struct { int64_t consumerId; int64_t blockingTime; + int32_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; int64_t currentOffset; @@ -1888,11 +1893,19 @@ typedef struct { typedef struct { int64_t consumerId; - int32_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray* topics; // SArray } SMqCMGetSubEpRsp; +struct tmq_message_t { + SMqRspHead head; + union { + SMqConsumeRsp consumeRsp; + SMqCMGetSubEpRsp getEpRsp; + }; + void* extra; +}; + static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { @@ -1945,7 +1958,6 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); - tlen += taosEncodeFixedI32(buf, pRsp->epoch); tlen += taosEncodeString(buf, pRsp->cgroup); int32_t sz = taosArrayGetSize(pRsp->topics); tlen += taosEncodeFixedI32(buf, sz); @@ -1958,7 +1970,6 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->consumerId); - buf = taosDecodeFixedI32(buf, &pRsp->epoch); buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); @@ -1974,6 +1985,8 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p return buf; } +#pragma pack(pop) + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 45c1858948..e9e64a78b8 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -13,19 +13,17 @@ * along with this program. If not, see . */ -#include "clientInt.h" -#include "trpc.h" #include "catalog.h" +#include "clientInt.h" #include "clientLog.h" +#include "trpc.h" static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { - return 0; -} +static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; @@ -39,8 +37,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); - tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); - + tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->uid); + if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { @@ -60,8 +58,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog taosHashCleanup(vgInfo.vgHash); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - } - + } + catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo); } @@ -106,8 +104,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_SUCCESS; } -static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { - SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); +static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { + SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); return TSDB_CODE_SUCCESS; @@ -116,7 +114,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; tscDebug("hb got %d rsp kv", kvNum); - + for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pRsp->info, i); switch (kv->key) { @@ -126,30 +124,30 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs break; } - int64_t *clusterId = (int64_t *)info->param; + int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - + int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); break; } hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); break; } - case HEARTBEAT_KEY_STBINFO:{ + case HEARTBEAT_KEY_STBINFO: { if (kv->valueLen <= 0 || NULL == kv->value) { tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value); break; } - int64_t *clusterId = (int64_t *)info->param; + int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - + int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); break; } @@ -165,22 +163,22 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs return TSDB_CODE_SUCCESS; } -static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { static int32_t emptyRspNum = 0; if (code != 0) { tfree(param); return -1; } - char *key = (char *)param; + char *key = (char *)param; SClientHbBatchRsp pRsp = {0}; tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); - + int32_t rspNum = taosArrayGetSize(pRsp.rsps); - SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); + SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { - tscError("cluster not exist, key:%s", key); + tscError("cluster not exist, key:%s", key); tfree(param); tFreeClientHbBatchRsp(&pRsp); return -1; @@ -189,13 +187,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code tfree(param); if (rspNum) { - tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); + tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, + atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); } else { atomic_add_fetch_32(&emptyRspNum, 1); } for (int32_t i = 0; i < rspNum; ++i) { - SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); + SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i); code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); if (code) { break; @@ -203,14 +202,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code } tFreeClientHbBatchRsp(&pRsp); - + return code; } int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SDbVgVersion *dbs = NULL; - uint32_t dbNum = 0; - int32_t code = 0; + uint32_t dbNum = 0; + int32_t code = 0; code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum); if (TSDB_CODE_SUCCESS != code) { @@ -238,8 +237,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SSTableMetaVersion *stbs = NULL; - uint32_t stbNum = 0; - int32_t code = 0; + uint32_t stbNum = 0; + int32_t code = 0; code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum); if (TSDB_CODE_SUCCESS != code) { @@ -254,7 +253,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC SSTableMetaVersion *stb = &stbs[i]; stb->suid = htobe64(stb->suid); stb->sversion = htons(stb->sversion); - stb->tversion = htons(stb->tversion); + stb->tversion = htons(stb->tversion); } SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs}; @@ -266,17 +265,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC return TSDB_CODE_SUCCESS; } - -int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { - int64_t *clusterId = (int64_t *)param; +int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { + int64_t *clusterId = (int64_t *)param; struct SCatalog *pCatalog = NULL; int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); return code; } - + code = hbGetExpiredDBInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; @@ -287,13 +285,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req return code; } - return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { - -} +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {} void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; @@ -312,10 +307,8 @@ void hbFreeReq(void *req) { tFreeReqKvHash(pReq->info); } - - -SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { - SClientHbBatchReq* pBatchReq = calloc(1, sizeof(SClientHbBatchReq)); +SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { + SClientHbBatchReq *pBatchReq = calloc(1, sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; @@ -324,11 +317,11 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); int32_t code = 0; - void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { - SClientHbReq* pOneReq = pIter; + SClientHbReq *pOneReq = pIter; - SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); + SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); if (info) { code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); if (code) { @@ -350,11 +343,10 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return pBatchReq; } - void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { - SClientHbReq* pOneReq = pIter; + SClientHbReq *pOneReq = pIter; tFreeReqKvHash(pOneReq->info); taosHashClear(pOneReq->info); @@ -363,31 +355,29 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { } } - - -static void* hbThreadFunc(void* param) { +static void *hbThreadFunc(void *param) { setThreadName("hb"); while (1) { int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); - if(1 == threadStop) { + if (1 == threadStop) { break; } pthread_mutex_lock(&clientHbMgr.lock); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); - for(int i = 0; i < sz; i++) { - SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); + for (int i = 0; i < sz; i++) { + SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); if (connCnt == 0) { continue; } - SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr); + SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); if (pReq == NULL) { continue; } - int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); + int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); void *buf = malloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -395,7 +385,7 @@ static void* hbThreadFunc(void* param) { hbClearReqInfo(pAppHbMgr); break; } - + tSerializeSClientHbBatchReq(buf, tlen, pReq); SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo)); @@ -415,17 +405,17 @@ static void* hbThreadFunc(void* param) { pInfo->requestObjRefId = 0; SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; - int64_t transporterId = 0; - SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); + int64_t transporterId = 0; + SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq, false); + tFreeClientHbBatchReq(pReq, false); hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } pthread_mutex_unlock(&clientHbMgr.lock); - + taosMsleep(HEARTBEAT_INTERVAL); } return NULL; @@ -449,17 +439,18 @@ static void hbStopThread() { tscDebug("hb thread already stopped"); return; } - + while (2 != atomic_load_8(&clientHbMgr.threadStop)) { usleep(10); } - tscDebug("hb thread stopped"); + tscDebug("hb thread stopped"); } -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { +SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { + /*return NULL;*/ hbMgrInit(); - SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); + SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -495,7 +486,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { pthread_mutex_lock(&clientHbMgr.lock); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); pthread_mutex_unlock(&clientHbMgr.lock); - + return pAppHbMgr; } @@ -504,7 +495,7 @@ void appHbMgrCleanup(void) { int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); for (int i = 0; i < sz; i++) { - SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); + SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); taosHashCleanup(pTarget->activeInfo); pTarget->activeInfo = NULL; taosHashCleanup(pTarget->connInfo); @@ -515,11 +506,12 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { + /*return 0;*/ // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; - clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*)); + clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *)); pthread_mutex_init(&clientHbMgr.lock, NULL); // init handle funcs @@ -534,34 +526,34 @@ int hbMgrInit() { void hbMgrCleanUp() { return; hbStopThread(); - + // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); if (old == 0) return; pthread_mutex_lock(&clientHbMgr.lock); appHbMgrCleanup(); - taosArrayDestroy(clientHbMgr.appHbMgrs); + taosArrayDestroy(clientHbMgr.appHbMgrs); pthread_mutex_unlock(&clientHbMgr.lock); - + clientHbMgr.appHbMgrs = NULL; } -int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { +int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { // init hash in activeinfo - void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { return 0; } SClientHbReq hbReq; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); - + // init hash if (info != NULL) { - SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); info->req = pReq; taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo)); } @@ -570,9 +562,10 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo * return 0; } -int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { +int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { + /*return 0;*/ SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; - SHbConnInfo info = {0}; + SHbConnInfo info = {0}; switch (hbType) { case HEARTBEAT_TYPE_QUERY: { @@ -588,11 +581,12 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3 default: break; } - + return hbRegisterConnImpl(pAppHbMgr, connKey, &info); } -void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { +void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { + /*return;*/ int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); @@ -602,9 +596,11 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } -int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { +int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen, + int32_t valueLen) { + return 0; // find req by connection id - SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); ASSERT(pReq != NULL); taosHashPut(pReq->info, key, keyLen, value, valueLen); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d3d9fce210..d9ab23b9fa 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -25,6 +25,7 @@ #include "tglobal.h" #include "tmsgtype.h" #include "tpagedbuf.h" +#include "tqueue.h" #include "tref.h" struct tmq_list_t { @@ -58,22 +59,28 @@ struct tmq_t { char groupId[256]; char clientId[256]; int8_t autoCommit; - SRWLatch lock; int64_t consumerId; int32_t epoch; int32_t resetOffsetCfg; int64_t status; - tsem_t rspSem; STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; SArray* clientTopics; // SArray + STaosQueue* mqueue; // queue of tmq_message_t + STaosQall* qall; // stat int64_t pollCnt; }; -struct tmq_message_t { - SMqConsumeRsp rsp; +enum { + TMQ_VG_STATUS__IDLE = 0, + TMQ_VG_STATUS__WAIT, +}; + +enum { + TMQ_CONSUMER_STATUS__INIT = 0, + TMQ_CONSUMER_STATUS__READY, }; typedef struct { @@ -83,6 +90,7 @@ typedef struct { int64_t currentOffset; // connection info int32_t vgId; + int32_t vgStatus; SEpSet epSet; } SMqClientVg; @@ -104,15 +112,16 @@ typedef struct { typedef struct { tmq_t* tmq; - int32_t wait; + int32_t sync; + tsem_t rspSem; } SMqAskEpCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - tmq_message_t** retMsg; - tsem_t rspSem; -} SMqConsumeCbParam; + tmq_t* tmq; + SMqClientVg* pVg; + int32_t epoch; + tsem_t rspSem; +} SMqPollCbParam; typedef struct { tmq_t* tmq; @@ -125,7 +134,7 @@ typedef struct { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); conf->auto_commit = false; - conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; + conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; return conf; } @@ -209,6 +218,22 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } +tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { + if (*topics == NULL) { + *topics = tmq_list_new(); + } + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i); + tmq_list_append(*topics, strdup(topic->topicName)); + } + return TMQ_RESP_ERR__SUCCESS; +} + +tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) { + tmq_list_t* lst = tmq_list_new(); + return tmq_subscribe(tmq, lst); +} + tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); if (pTmq == NULL) { @@ -218,7 +243,6 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - taosInitRWLatch(&pTmq->lock); // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -226,9 +250,11 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; - tsem_init(&pTmq->rspSem, 0, 0); pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); + + pTmq->mqueue = taosOpenQueue(); + pTmq->qall = taosAllocateQall(); return pTmq; } @@ -290,7 +316,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in pParam->tmq = tmq; tsem_init(&pParam->rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = pParam; @@ -365,10 +395,17 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tscError("failed to malloc request"); } - SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; + SMqSubscribeCbParam param = { + .rspErr = TMQ_RESP_ERR__SUCCESS, + .tmq = tmq, + }; tsem_init(¶m.rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = ¶m; @@ -391,36 +428,6 @@ _return: void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } -SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { - tmq_t* pTmq = (void*)param; - SArray* pArray = taosArrayInit(0, sizeof(SKv)); - if (pArray == NULL) { - return NULL; - } - SKv kv = {0}; - kv.key = HEARTBEAT_KEY_MQ_TMP; - - SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg)); - if (pMqHb == NULL) { - return pArray; - } - pMqHb->consumerId = connKey.connId; - SArray* clientTopics = pTmq->clientTopics; - int sz = taosArrayGetSize(clientTopics); - for (int i = 0; i < sz; i++) { - SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i); - /*if (pCTopic->vgId == -1) {*/ - /*pMqHb->status = 1;*/ - /*break;*/ - /*}*/ - } - kv.value = pMqHb; - kv.valueLen = sizeof(SMqHbMsg); - taosArrayPush(pArray, &kv); - - return pArray; -} - TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { STscObj* pTscObj = (STscObj*)taos; SRequestObj* pRequest = NULL; @@ -578,7 +585,7 @@ void tmqShowMsg(tmq_message_t* tmq_message) { static bool noPrintSchema; char pBuf[128]; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; int32_t colNum = pRsp->schemas->nCols; if (!noPrintSchema) { printf("|"); @@ -618,94 +625,125 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; - SMqClientVg* pVg = pParam->pVg; + printf("recv poll\n"); + SMqPollCbParam* pParam = (SMqPollCbParam*)param; + SMqClientVg* pVg = pParam->pVg; + tmq_t* tmq = pParam->tmq; if (code != 0) { printf("msg discard\n"); - tsem_post(&pParam->rspSem); + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } return 0; } - SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp)); - if (pRsp == NULL) { - tsem_post(&pParam->rspSem); - return -1; - } - tDecodeSMqConsumeRsp(pMsg->pData, pRsp); - /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ - if (pRsp->numOfTopics == 0) { - /*printf("no data\n");*/ - free(pRsp); - tsem_post(&pParam->rspSem); + int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; + int32_t tmqEpoch = atomic_load_32(&tmq->epoch); + if (msgEpoch < tmqEpoch) { + printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); return 0; } - *pParam->retMsg = (tmq_message_t*)pRsp; - pVg->currentOffset = pRsp->rspOffset; + + if (msgEpoch != tmqEpoch) { + printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); + } + + /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ + tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + if (pRsp == NULL) { + printf("fail\n"); + return -1; + } + memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); + /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ + if (pRsp->consumeRsp.numOfTopics == 0) { + printf("no data\n"); + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } + taosFreeQitem(pRsp); + return 0; + } + pRsp->extra = pParam->pVg; + taosWriteQitem(tmq->mqueue, pRsp); + printf("poll in queue\n"); + /*pParam->rspMsg = (tmq_message_t*)pRsp;*/ + /*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/ + /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ /*printf("-----msg begin----\n");*/ - tsem_post(&pParam->rspSem); /*printf("\n-----msg end------\n");*/ return 0; } +bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { + bool set = false; + int32_t sz = taosArrayGetSize(pRsp->topics); + if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); + tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); + for (int32_t i = 0; i < sz; i++) { + SMqClientTopic topic = {0}; + SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); + topic.topicName = strdup(pTopicEp->topic); + int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgSz; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + SMqClientVg clientVg = { + .pollCnt = 0, + .currentOffset = pVgEp->offset, + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet, + .vgStatus = TMQ_VG_STATUS__IDLE, + }; + taosArrayPush(topic.vgs, &clientVg); + set = true; + } + taosArrayPush(tmq->clientTopics, &topic); + } + atomic_store_32(&tmq->epoch, epoch); + return set; +} + int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; if (code != 0) { - printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait); - if (pParam->wait) { - tsem_post(&tmq->rspSem); + printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync); + if (pParam->sync) { + tsem_post(&pParam->rspSem); } return 0; } tscDebug("tmq ask ep cb called"); - bool set = false; - SMqCMGetSubEpRsp rsp; - tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); - int32_t sz = taosArrayGetSize(rsp.topics); - // TODO: lock - /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ - /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ - if (rsp.epoch != tmq->epoch) { - // TODO - if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); - tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); - for (int32_t i = 0; i < sz; i++) { - SMqClientTopic topic = {0}; - SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i); - topic.topicName = strdup(pTopicEp->topic); - int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); - topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); - for (int32_t j = 0; j < vgSz; j++) { - SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - // clang-format off - SMqClientVg clientVg = { - .pollCnt = 0, - .currentOffset = pVgEp->offset, - .vgId = pVgEp->vgId, - .epSet = pVgEp->epSet - }; - // clang-format on - taosArrayPush(topic.vgs, &clientVg); - set = true; - } - taosArrayPush(tmq->clientTopics, &topic); + if (pParam->sync) { + SMqRspHead* head = pMsg->pData; + SMqCMGetSubEpRsp rsp; + tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); + /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ + /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ + int32_t epoch = atomic_load_32(&tmq->epoch); + if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) { + atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY); } - tmq->epoch = rsp.epoch; + tsem_post(&pParam->rspSem); + tDeleteSMqCMGetSubEpRsp(&rsp); + } else { + tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); + taosWriteQitem(tmq->mqueue, pRsp); } - if (set) { - atomic_store_64(&tmq->status, 1); - } - // unlock - /*tsem_post(&tmq->rspSem);*/ - if (pParam->wait) { - tsem_post(&tmq->rspSem); - } - tDeleteSMqCMGetSubEpRsp(&rsp); return 0; } -int32_t tmqAskEp(tmq_t* tmq, bool wait) { +int32_t tmqAskEp(tmq_t* tmq, bool sync) { + printf("ask ep sync %d\n", sync); int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -722,7 +760,11 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { goto END; } - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { @@ -730,7 +772,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { goto END; } pParam->tmq = tmq; - pParam->wait = wait; + pParam->sync = sync; + tsem_init(&pParam->rspSem, 0, 0); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; @@ -743,7 +786,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); END: - if (wait) tsem_wait(&tmq->rspSem); + if (sync) tsem_wait(&pParam->rspSem); return 0; } @@ -791,6 +834,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie pReq->blockingTime = blocking_time; pReq->consumerId = tmq->consumerId; + pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; pReq->head.vgId = htonl(pVg->vgId); @@ -798,11 +842,158 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie return pReq; } -tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { - tmq_message_t* tmq_message = NULL; +void tmqClearUnhandleMsg(tmq_t* tmq) { + tmq_message_t* msg; + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } + taosReadAllQitems(tmq->mqueue, tmq->qall); + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } +} + +int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { + printf("call poll\n"); + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); + if (vgStatus != TMQ_VG_STATUS__IDLE) { + continue; + } + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + if (pReq == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return -1; + } + SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); + if (param == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return -1; + } + param->tmq = tmq; + param->pVg = pVg; + param->epoch = tmq->epoch; + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); + pRequest->body.requestMsg = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = param; + sendInfo->fp = tmqPollCb; + + int64_t transporterId = 0; + printf("send poll\n"); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + pVg->pollCnt++; + tmq->pollCnt++; + } + } + return 0; +} + +// return +int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { + printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch); + if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { + tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); + tmqClearUnhandleMsg(tmq); + *pReset = true; + } else { + *pReset = false; + } + } else { + return -1; + } + return 0; +} + +tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { + while (1) { + tmq_message_t* rspMsg = NULL; + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) { + break; + } + + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + printf("handle poll rsp %d\n", rspMsg->head.mqMsgType); + if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { + printf("epoch match\n"); + SMqClientVg* pVg = rspMsg->extra; + pVg->currentOffset = rspMsg->consumeRsp.rspOffset; + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + return rspMsg; + } else { + printf("epoch mismatch\n"); + taosFreeQitem(rspMsg); + } + } else { + printf("handle ep rsp %d\n", rspMsg->head.mqMsgType); + bool reset = false; + tmqHandleRes(tmq, rspMsg, &reset); + taosFreeQitem(rspMsg); + if (pollIfReset && reset) { + printf("reset and repoll\n"); + tmqPollImpl(tmq, blockingTime); + } + } + } + return NULL; +} + +tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { + tmq_message_t* rspMsg = NULL; + int64_t startTime = taosGetTimestampMs(); + + // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); - tmqAskEp(tmq, status == 0); + tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); + + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) { + taosReadAllQitems(tmq->mqueue, tmq->qall); + } + tmqHandleAllRsp(tmq, blocking_time, false); + + tmqPollImpl(tmq, blocking_time); + + while (1) { + /*printf("cycle\n");*/ + taosReadAllQitems(tmq->mqueue, tmq->qall); + rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); + if (rspMsg) { + return rspMsg; + } + if (blocking_time != 0) { + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + printf("normal exit\n"); + return NULL; + } + } + } +} + +#if 0 if (blocking_time <= 0) blocking_time = 1; if (blocking_time > 1000) blocking_time = 1000; @@ -834,7 +1025,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { return NULL; } - SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); + SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); if (param == NULL) { ASSERT(false); usleep(blocking_time * 1000); @@ -846,7 +1037,11 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tsem_init(¶m->rspSem, 0, 0); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; @@ -886,6 +1081,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*return pRequest;*/ } +#endif #if 0 tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { @@ -929,9 +1125,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; tDeleteSMqConsumeRsp(pRsp); - free(tmq_message); + taosFreeQitem(tmq_message); } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index db84f87b66..00cf770e6b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -271,7 +271,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddTimezone(pCfg, "timezone", tsTimezone) != 0) return -1; if (cfgAddLocale(pCfg, "locale", tsLocale) != 0) return -1; if (cfgAddCharset(pCfg, "charset", tsCharset) != 0) return -1; - if (cfgAddBool(pCfg, "enableCoreFile", 0, 1) != 0) return -1; + if (cfgAddBool(pCfg, "enableCoreFile", 1, 1) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCores", tsNumOfCores, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "pageSize(KB)", tsPageSize, 0, INT64_MAX, 1) != 0) return -1; if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8de3906388..d8c850c6af 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -270,7 +270,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR int32_t rspNum = 0; if (tDecodeI32(&decoder, &rspNum) < 0) return -1; if (pBatchRsp->rsps == NULL) { - pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbReq)); + pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbRsp)); } for (int32_t i = 0; i < rspNum; i++) { SClientHbRsp rsp = {0}; @@ -1529,7 +1529,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { if (pRsp->vgNum <= 0) { return 0; } - + pRsp->pVgroupInfos = taosArrayInit(pRsp->vgNum, sizeof(SVgroupInfo)); if (pRsp->pVgroupInfos == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index dfa2b74755..157bad26a6 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -114,7 +114,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg; - /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2ea157fea4..60533b979c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -270,9 +270,8 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { strcpy(rsp.cgroup, pReq->cgroup); rsp.consumerId = consumerId; - rsp.epoch = pConsumer->epoch; - if (epoch != rsp.epoch) { - mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch); + if (epoch != pConsumer->epoch) { + mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch); SArray *pTopics = pConsumer->currentTopics; int32_t sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); @@ -308,13 +307,16 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { mndReleaseSubscribe(pMnode, pSub); } } - int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp); void *buf = rpcMallocCont(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - void *abuf = buf; + ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; + ((SMqRspHead *)buf)->epoch = pConsumer->epoch; + + void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tDeleteSMqCMGetSubEpRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index d2318009d5..9822550ee5 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -72,7 +72,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { if (pRaw == NULL) goto TOPIC_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER); @@ -121,7 +121,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { int32_t len; int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER); @@ -206,7 +206,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { SName name = {0}; tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - char db[TSDB_TABLE_FNAME_LEN] = {0}; + char db[TSDB_TOPIC_FNAME_LEN] = {0}; tNameGetFullDbName(&name, db); return mndAcquireDb(pMnode, db); @@ -223,7 +223,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq pDrop->head.contLen = htonl(contLen); pDrop->head.vgId = htonl(pVgroup->vgId); - memcpy(pDrop->name, pTopic->name, TSDB_TABLE_FNAME_LEN); + memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN); pDrop->tuid = htobe64(pTopic->uid); return pDrop; @@ -343,6 +343,7 @@ CREATE_TOPIC_OVER: } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { + // TODO: cannot drop when subscribed STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); @@ -408,75 +409,34 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - STableInfoReq infoReq = {0}; - - if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - mDebug("topic:%s, start to retrieve meta", infoReq.tbName); - #if 0 - SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname); +static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { + SSdb *pSdb = pMnode->pSdb; + SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("topic:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); return -1; } - STopicObj *pTopic = mndAcquireTopic(pMnode, pInfo->tableFname); - if (pTopic == NULL) { - mndReleaseDb(pMnode, pDb); - terrno = TSDB_CODE_MND_INVALID_TOPIC; - mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; + int32_t numOfTopics = 0; + void *pIter = NULL; + while (1) { + SMqTopicObj *pTopic = NULL; + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + if (pTopic->dbUid == pDb->uid) { + numOfTopics++; + } + + sdbRelease(pSdb, pTopic); } - taosRLockLatch(&pTopic->lock); - int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags; - int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); - - STableMetaRsp *pMeta = rpcMallocCont(contLen); - if (pMeta == NULL) { - taosRUnLockLatch(&pTopic->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseTopic(pMnode, pTopic); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - memcpy(pMeta->topicFname, pTopic->name, TSDB_TABLE_FNAME_LEN); - pMeta->numOfTags = htonl(pTopic->numOfTags); - pMeta->numOfColumns = htonl(pTopic->numOfColumns); - pMeta->precision = pDb->cfg.precision; - pMeta->tableType = TSDB_SUPER_TABLE; - pMeta->update = pDb->cfg.update; - pMeta->sversion = htonl(pTopic->version); - pMeta->tuid = htonl(pTopic->uid); - - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pMeta->pSchemas[i]; - SSchema *pSrcSchema = &pTopic->pSchema[i]; - memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); - pSchema->type = pSrcSchema->type; - pSchema->colId = htonl(pSrcSchema->colId); - pSchema->bytes = htonl(pSrcSchema->bytes); - } - taosRUnLockLatch(&pTopic->lock); + *pNumOfTopics = numOfTopics; mndReleaseDb(pMnode, pDb); - mndReleaseTopic(pMnode, pTopic); - - pReq->pCont = pMeta; - pReq->contLen = contLen; - - mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags); -#endif return 0; } +#endif static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { SSdb *pSdb = pMnode->pSdb; @@ -571,7 +531,7 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in if (pTopic->dbUid != pDb->uid) { if (strncmp(pTopic->name, prefix, prefixLen) != 0) { - mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); + mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); } sdbRelease(pSdb, pTopic); @@ -580,8 +540,8 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in cols = 0; - char topicName[TSDB_TABLE_NAME_LEN] = {0}; - tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TABLE_NAME_LEN); + char topicName[TSDB_TOPIC_NAME_LEN] = {0}; + tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_TO_VARSTR(pWrite, topicName); cols++; diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index ffab15f394..36626514ec 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -51,7 +51,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl void tqClose(STQ*); // required by vnode -int tqPushMsg(STQ*, void* msg, int64_t version); +int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); int tqCommit(STQ*); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index e87de10912..30a83ca634 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -79,19 +79,19 @@ extern int32_t tqDebugFlag; // 4096 - 4080 #define TQ_IDX_PAGE_HEAD_SIZE 16 -#define TQ_ACTION_CONST 0 -#define TQ_ACTION_INUSE 1 +#define TQ_ACTION_CONST 0 +#define TQ_ACTION_INUSE 1 #define TQ_ACTION_INUSE_CONT 2 -#define TQ_ACTION_INTXN 3 +#define TQ_ACTION_INTXN 3 #define TQ_SVER 0 // TODO: inplace mode is not implemented #define TQ_UPDATE_INPLACE 0 -#define TQ_UPDATE_APPEND 1 +#define TQ_UPDATE_APPEND 1 #define TQ_DUP_INTXN_REWRITE 0 -#define TQ_DUP_INTXN_REJECT 2 +#define TQ_DUP_INTXN_REJECT 2 static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } @@ -160,7 +160,7 @@ struct STQ { STqMemRef tqMemRef; STqMetaStore* tqMeta; SWal* pWal; - SMeta* pMeta; + SMeta* pVnodeMeta; }; typedef struct { @@ -190,9 +190,6 @@ typedef struct { char* logicalPlan; char* physicalPlan; char* qmsg; - int64_t persistedOffset; - int64_t committedOffset; - int64_t currentOffset; STqBuffer buffer; SWalReadHandle* pReadhandle; } STqTopic; @@ -201,7 +198,7 @@ typedef struct { int64_t consumerId; int64_t epoch; char cgroup[TSDB_TOPIC_FNAME_LEN]; - SArray* topics; // SArray + SArray* topics; // SArray } STqConsumer; int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ac9dde3597..aa198d0806 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -42,7 +42,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl pTq->path = strdup(path); pTq->tqConfig = tqConfig; pTq->pWal = pWal; - pTq->pMeta = pMeta; + pTq->pVnodeMeta = pMeta; #if 0 pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); @@ -71,9 +71,11 @@ void tqClose(STQ* pTq) { // TODO } -int tqPushMsg(STQ* pTq, void* p, int64_t version) { - // add reference - // judge and launch new query +int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + // TODO: add reference + // if handle waiting, launch query and response to consumer + // + // if no waiting handle, return return 0; } @@ -101,9 +103,9 @@ static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/ /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/ tlen += taosEncodeString(buf, pTopic->qmsg); - tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset); - tlen += taosEncodeFixedI64(buf, pTopic->committedOffset); - tlen += taosEncodeFixedI64(buf, pTopic->currentOffset); + /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/ + /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/ + /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/ return tlen; } @@ -113,9 +115,9 @@ static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopi /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/ /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/ buf = taosDecodeString(buf, &pTopic->qmsg); - buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset); - buf = taosDecodeFixedI64(buf, &pTopic->committedOffset); - buf = taosDecodeFixedI64(buf, &pTopic->currentOffset); + /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/ + /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/ + /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/ return buf; } @@ -194,8 +196,8 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } for (int j = 0; j < TQ_BUFFER_SIZE; j++) { pTopic->buffer.output[j].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); } @@ -218,7 +220,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { fetchOffset = pReq->currentOffset + 1; } - SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; + SMqConsumeRsp rsp = { + .consumerId = consumerId, + .numOfTopics = 0, + .pBlockData = NULL, + }; STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { @@ -243,7 +249,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and - // rsponse to user + // response to user break; } pHead = pTopic->pReadhandle->pHead; @@ -268,19 +274,20 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { taosArrayPush(pRes, pDataBlock); rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.rspOffset = fetchOffset; - pTopic->currentOffset = fetchOffset; rsp.numOfTopics = 1; rsp.pBlockData = pRes; - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; return -1; } + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; - void* abuf = buf; + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqConsumeRsp(&abuf, &rsp); taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); pMsg->pCont = buf; @@ -295,14 +302,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } } - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; return -1; } + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; - void* abuf = buf; + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqConsumeRsp(&abuf, &rsp); rsp.pBlockData = NULL; pMsg->pCont = buf; @@ -312,158 +321,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } -#if 0 -int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) { - SMqConsumeReq* pReq = pMsg->pCont; - int64_t reqId = pReq->reqId; - int64_t consumerId = pReq->consumerId; - int64_t fetchOffset = pReq->offset; - int64_t blockingTime = pReq->blockingTime; - - SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; - - /*printf("vg %d get consume req\n", pReq->head.vgId);*/ - - STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); - if (pConsumer == NULL) { - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = -1; - rpcSendResponse(pMsg); - return 0; - } - int sz = taosArrayGetSize(pConsumer->topics); - - for (int i = 0; i < sz; i++) { - STqTopic* pTopic = taosArrayGet(pConsumer->topics, i); - // TODO: support multiple topic in one req - if (strcmp(pTopic->topicName, pReq->topic) != 0) { - ASSERT(false); - continue; - } - - if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { - pTopic->committedOffset = pReq->offset; - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = 0; - rpcSendResponse(pMsg); - return 0; - } - - if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { - pTopic->committedOffset = pReq->offset - 1; - } - - rsp.committedOffset = pTopic->committedOffset; - rsp.reqOffset = pReq->offset; - rsp.skipLogNum = 0; - - if (fetchOffset <= pTopic->committedOffset) { - fetchOffset = pTopic->committedOffset + 1; - } - /*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/ - int8_t pos; - int8_t skip = 0; - SWalHead* pHead; - while (1) { - pos = fetchOffset % TQ_BUFFER_SIZE; - skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); - if (skip == 1) { - // do nothing - break; - } - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - printf("read offset %ld\n", fetchOffset); - // check err - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - skip = 1; - break; - } - // read until find TDMT_VND_SUBMIT - pHead = pTopic->pReadhandle->pHead; - if (pHead->head.msgType == TDMT_VND_SUBMIT) { - } - rsp.skipLogNum++; - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - printf("read offset %ld\n", fetchOffset); - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - skip = 1; - break; - } - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - fetchOffset++; - } - if (skip == 1) continue; - SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; - qTaskInfo_t task = pTopic->buffer.output[pos].task; - - printf("current fetch offset %ld\n", fetchOffset); - qSetStreamInput(task, pCont); - - // SArray - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - while (1) { - SSDataBlock* pDataBlock; - uint64_t ts; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - break; - } - if (pDataBlock != NULL) { - taosArrayPush(pRes, pDataBlock); - } else { - break; - } - } - // TODO copy - rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; - rsp.rspOffset = fetchOffset; - pTopic->currentOffset = fetchOffset; - - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - - if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - fetchOffset++; - continue; - } else { - rsp.numOfTopics++; - } - - rsp.pBlockData = pRes; - -#if 0 - pTopic->buffer.output[pos].dst = pRes; - if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) { - pTopic->buffer.firstOffset = pReq->offset; - } - if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { - pTopic->buffer.lastOffset = pReq->offset; - } -#endif - } - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - pMsg->code = -1; - return -1; - } - void* abuf = buf; - tEncodeSMqConsumeRsp(&abuf, &rsp); - - if (rsp.pBlockData) { - taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); - rsp.pBlockData = NULL; - } - - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - rpcSendResponse(pMsg); - return 0; -} -#endif - int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; tDecodeSMqMVRebReq(msg, &req); @@ -505,8 +362,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->logicalPlan = req.logicalPlan; pTopic->physicalPlan = req.physicalPlan; pTopic->qmsg = req.qmsg; - pTopic->committedOffset = -1; - pTopic->currentOffset = -1; + /*pTopic->committedOffset = -1;*/ + /*pTopic->currentOffset = -1;*/ pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; @@ -516,8 +373,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { } for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); } diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index c3947da459..81eb09f48f 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // todo: change the interface here int64_t ver; taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); - if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) { + if (tqPushMsg(pVnode->pTq, ptr, pMsg->msgType, ver) < 0) { // TODO: handle error } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index fbb0e75257..136afb9a15 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -196,9 +196,11 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { char *mode = NULL; if (tdFileOptions & TD_FILE_APPEND) { mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+"; - }else if (tdFileOptions & TD_FILE_TRUNC) { + } else if (tdFileOptions & TD_FILE_TRUNC) { mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+"; - }else { + } else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) { + mode = (tdFileOptions & TD_FILE_TEXT) ? "rt" : "rb"; + } else { mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+"; } assert(!(tdFileOptions & TD_FILE_EXCL)); @@ -635,7 +637,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { } assert(pFile->fp != NULL); - char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; + char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; va_list ap; va_start(ap, format); vfprintf(pFile->fp, format, ap); @@ -673,11 +675,11 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict__ ptrBuf) { size_t len = 0; return getline(ptrBuf, &len, pFile->fp); } -int32_t taosEOFFile(TdFilePtr pFile) { +int32_t taosEOFFile(TdFilePtr pFile) { if (pFile == NULL) { return 0; } assert(pFile->fp != NULL); - return feof(pFile->fp); -} \ No newline at end of file + return feof(pFile->fp); +} diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index 8a1763c4fc..f62c43773d 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -41,9 +41,11 @@ void *cancelHandler(void *arg) { taosReleaseRef(tscObjRef, rid); #endif #else + reset_terminal_mode(); printf("\nReceive ctrl+c or other signal, quit shell.\n"); exit(0); #endif + reset_terminal_mode(); printf("\nReceive ctrl+c or other signal, quit shell.\n"); exit(0); }