This commit is contained in:
Liu Jicong 2022-03-01 17:49:14 +08:00
parent eae3d9a103
commit b5f5400d30
4 changed files with 104 additions and 97 deletions

View File

@ -13,19 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "clientInt.h"
#include "trpc.h"
#include "catalog.h" #include "catalog.h"
#include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "trpc.h"
static SClientHbMgr clientHbMgr = {0}; static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread(); static int32_t hbCreateThread();
static void hbStopThread(); static void hbStopThread();
static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
return 0;
}
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0; int32_t code = 0;
@ -189,7 +187,8 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
tfree(param); tfree(param);
if (rspNum) { if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
} else { } else {
atomic_add_fetch_32(&emptyRspNum, 1); atomic_add_fetch_32(&emptyRspNum, 1);
} }
@ -266,7 +265,6 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
int64_t *clusterId = (int64_t *)param; int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
@ -287,13 +285,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req
return code; return code;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
}
void hbMgrInitMqHbHandle() { void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
@ -312,8 +307,6 @@ void hbFreeReq(void *req) {
tFreeReqKvHash(pReq->info); tFreeReqKvHash(pReq->info);
} }
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq *pBatchReq = calloc(1, sizeof(SClientHbBatchReq)); SClientHbBatchReq *pBatchReq = calloc(1, sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) { if (pBatchReq == NULL) {
@ -350,7 +343,6 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return pBatchReq; return pBatchReq;
} }
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) { while (pIter != NULL) {
@ -363,8 +355,6 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
} }
} }
static void *hbThreadFunc(void *param) { static void *hbThreadFunc(void *param) {
setThreadName("hb"); setThreadName("hb");
while (1) { while (1) {
@ -458,6 +448,7 @@ static void hbStopThread() {
} }
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
/*return NULL;*/
hbMgrInit(); hbMgrInit();
SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
@ -515,6 +506,7 @@ void appHbMgrCleanup(void) {
} }
int hbMgrInit() { int hbMgrInit() {
/*return 0;*/
// init once // init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0; if (old == 1) return 0;
@ -532,7 +524,7 @@ int hbMgrInit() {
} }
void hbMgrCleanUp() { void hbMgrCleanUp() {
return; /*return;*/
hbStopThread(); hbStopThread();
// destroy all appHbMgr // destroy all appHbMgr
@ -571,6 +563,7 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
} }
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
/*return 0;*/
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0}; SHbConnInfo info = {0};
@ -593,6 +586,7 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3
} }
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
/*return;*/
int32_t code = 0; int32_t code = 0;
code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
@ -602,7 +596,9 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
} }
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
int32_t valueLen) {
return 0;
// find req by connection id // find req by connection id
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(pReq != NULL); ASSERT(pReq != NULL);

View File

@ -134,7 +134,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
conf->auto_commit = false; conf->auto_commit = false;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return conf; return conf;
} }
@ -651,13 +651,17 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) { if (pRsp == NULL) {
printf("fail\n");
return -1; return -1;
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->consumeRsp.numOfTopics == 0) { if (pRsp->consumeRsp.numOfTopics == 0) {
/*printf("no data\n");*/ printf("no data\n");
if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosFreeQitem(pRsp); taosFreeQitem(pRsp);
return 0; return 0;
} }
@ -732,7 +736,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
return -1; return -1;
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp); tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp);
taosWriteQitem(tmq->mqueue, pRsp); taosWriteQitem(tmq->mqueue, pRsp);
} }
return 0; return 0;
@ -973,16 +977,21 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmqPollImpl(tmq, blocking_time); tmqPollImpl(tmq, blocking_time);
while (1) { while (1) {
/*printf("cycle\n");*/
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
tmqHandleAllRsp(tmq, blocking_time, true); rspMsg = tmqHandleAllRsp(tmq, blocking_time, true);
if (rspMsg) {
return rspMsg;
}
if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs(); int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) { if (endTime - startTime > blocking_time) {
printf("cycle end\n"); printf("normal exit\n");
usleep(1000 * 1000);
return NULL; return NULL;
} }
} }
} }
}
#if 0 #if 0

View File

@ -278,14 +278,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.numOfTopics = 1; rsp.numOfTopics = 1;
rsp.pBlockData = pRes; rsp.pBlockData = pRes;
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
pMsg->code = -1; pMsg->code = -1;
return -1; return -1;
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
void* abuf = buf; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqConsumeRsp(&abuf, &rsp); tEncodeSMqConsumeRsp(&abuf, &rsp);
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pMsg->pCont = buf; pMsg->pCont = buf;