[TD-437] fix definite lost while process show msg in mnode
This commit is contained in:
parent
6482d53723
commit
488addaf44
|
@ -220,6 +220,7 @@ typedef struct SAcctObj {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
int32_t index;
|
||||||
char db[TSDB_DB_NAME_LEN + 1];
|
char db[TSDB_DB_NAME_LEN + 1];
|
||||||
void * pIter;
|
void * pIter;
|
||||||
int16_t numOfColumns;
|
int16_t numOfColumns;
|
||||||
|
@ -228,7 +229,6 @@ typedef struct {
|
||||||
int32_t numOfReads;
|
int32_t numOfReads;
|
||||||
int16_t offset[TSDB_MAX_COLUMNS];
|
int16_t offset[TSDB_MAX_COLUMNS];
|
||||||
int16_t bytes[TSDB_MAX_COLUMNS];
|
int16_t bytes[TSDB_MAX_COLUMNS];
|
||||||
void * signature;
|
|
||||||
uint16_t payloadLen;
|
uint16_t payloadLen;
|
||||||
char payload[];
|
char payload[];
|
||||||
} SShowObj;
|
} SShowObj;
|
||||||
|
|
|
@ -47,13 +47,14 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg);
|
||||||
static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg);
|
static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg);
|
||||||
|
|
||||||
static void mnodeFreeShowObj(void *data);
|
static void mnodeFreeShowObj(void *data);
|
||||||
static bool mnodeCheckShowObj(SShowObj *pShow);
|
static bool mnodeAccquireShowObj(SShowObj *pShow);
|
||||||
static bool mnodeCheckShowFinished(SShowObj *pShow);
|
static bool mnodeCheckShowFinished(SShowObj *pShow);
|
||||||
static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size);
|
static void *mnodePutShowObj(SShowObj *pShow, int32_t size);
|
||||||
static void mnodeCleanupShowObj(void *pShow, bool forceRemove);
|
static void mnodeReleaseShowObj(void *pShow, bool forceRemove);
|
||||||
|
|
||||||
extern void *tsMnodeTmr;
|
extern void *tsMnodeTmr;
|
||||||
static void *tsQhandleCache = NULL;
|
static void *tsMnodeShowCache = NULL;
|
||||||
|
static int32_t tsShowObjIndex = 0;
|
||||||
static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
|
static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
|
||||||
static SShowRetrieveFp tsMnodeShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
|
static SShowRetrieveFp tsMnodeShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
|
||||||
|
|
||||||
|
@ -64,14 +65,15 @@ int32_t mnodeInitShow() {
|
||||||
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
|
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
|
||||||
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
|
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
|
||||||
|
|
||||||
tsQhandleCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj);
|
tsMnodeShowCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mnodeCleanUpShow() {
|
void mnodeCleanUpShow() {
|
||||||
if (tsQhandleCache != NULL) {
|
if (tsMnodeShowCache != NULL) {
|
||||||
taosCacheCleanup(tsQhandleCache);
|
mPrint("show cache is cleanup");
|
||||||
tsQhandleCache = NULL;
|
taosCacheCleanup(tsMnodeShowCache);
|
||||||
|
tsMnodeShowCache = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,13 +120,12 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen);
|
int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen);
|
||||||
SShowObj *pShow = (SShowObj *) calloc(1, showObjSize);
|
SShowObj *pShow = (SShowObj *) calloc(1, showObjSize);
|
||||||
pShow->signature = pShow;
|
|
||||||
pShow->type = pShowMsg->type;
|
pShow->type = pShowMsg->type;
|
||||||
pShow->payloadLen = htons(pShowMsg->payloadLen);
|
pShow->payloadLen = htons(pShowMsg->payloadLen);
|
||||||
strcpy(pShow->db, pShowMsg->db);
|
strcpy(pShow->db, pShowMsg->db);
|
||||||
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
|
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
|
||||||
|
|
||||||
pShow = mnodeSaveShowObj(pShow, showObjSize);
|
pShow = mnodePutShowObj(pShow, showObjSize);
|
||||||
if (pShow == NULL) {
|
if (pShow == NULL) {
|
||||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -132,21 +133,22 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
|
||||||
int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
|
int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
|
||||||
SCMShowRsp *pShowRsp = rpcMallocCont(size);
|
SCMShowRsp *pShowRsp = rpcMallocCont(size);
|
||||||
if (pShowRsp == NULL) {
|
if (pShowRsp == NULL) {
|
||||||
mnodeFreeShowObj(pShow);
|
mnodeReleaseShowObj(pShow, true);
|
||||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
pShowRsp->qhandle = htobe64((uint64_t) pShow);
|
pShowRsp->qhandle = htobe64((uint64_t) pShow);
|
||||||
|
|
||||||
mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type));
|
mTrace("%p, show type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type));
|
||||||
int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle);
|
int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pMsg->rpcRsp.rsp = pShowRsp;
|
pMsg->rpcRsp.rsp = pShowRsp;
|
||||||
pMsg->rpcRsp.len = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
|
pMsg->rpcRsp.len = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
|
||||||
|
mnodeReleaseShowObj(pShow, false);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code));
|
mError("%p, show type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code));
|
||||||
rpcFreeCont(pShowRsp);
|
rpcFreeCont(pShowRsp);
|
||||||
mnodeCleanupShowObj(pShow, true);
|
mnodeReleaseShowObj(pShow, true);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,22 +161,20 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
|
||||||
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
||||||
|
|
||||||
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
|
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
|
||||||
mTrace("show:%p, type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type));
|
mTrace("%p, show type:%s, retrieve data", pShow, mnodeGetShowType(pShow->type));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* in case of server restart, apps may hold qhandle created by server before
|
* in case of server restart, apps may hold qhandle created by server before
|
||||||
* restart, which is actually invalid, therefore, signature check is required.
|
* restart, which is actually invalid, therefore, signature check is required.
|
||||||
*/
|
*/
|
||||||
if (!mnodeCheckShowObj(pShow)) {
|
if (!mnodeAccquireShowObj(pShow)) {
|
||||||
mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pShow);
|
mError("%p, show is invalid", pShow);
|
||||||
return TSDB_CODE_INVALID_QHANDLE;
|
return TSDB_CODE_INVALID_QHANDLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mnodeCheckShowFinished(pShow)) {
|
if (mnodeCheckShowFinished(pShow)) {
|
||||||
mTrace("retrieve:%p, qhandle:%p already read finished, numOfReads:%d numOfRows:%d", pRetrieve, pShow, pShow->numOfReads, pShow->numOfRows);
|
mTrace("%p, show is already read finished, numOfReads:%d numOfRows:%d", pShow, pShow->numOfReads, pShow->numOfRows);
|
||||||
pShow->numOfReads = pShow->numOfRows;
|
pShow->numOfReads = pShow->numOfRows;
|
||||||
//mnodeCleanupShowObj(pShow, true);
|
|
||||||
//return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||||
|
@ -200,7 +200,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
if (rowsRead < 0) {
|
if (rowsRead < 0) {
|
||||||
rpcFreeCont(pRsp);
|
rpcFreeCont(pRsp);
|
||||||
mnodeCleanupShowObj(pShow, false);
|
mnodeReleaseShowObj(pShow, false);
|
||||||
assert(false);
|
assert(false);
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
@ -211,10 +211,11 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
|
||||||
pMsg->rpcRsp.rsp = pRsp;
|
pMsg->rpcRsp.rsp = pRsp;
|
||||||
pMsg->rpcRsp.len = size;
|
pMsg->rpcRsp.len = size;
|
||||||
|
|
||||||
if (rowsToRead == 0) {
|
if (rowsToRead == 0 || (rowsRead == rowsToRead)) {
|
||||||
mnodeCleanupShowObj(pShow, true);
|
pRsp->completed = 1;
|
||||||
|
mnodeReleaseShowObj(pShow, true);
|
||||||
} else {
|
} else {
|
||||||
mnodeCleanupShowObj(pShow, false);
|
mnodeReleaseShowObj(pShow, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -318,24 +319,29 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mnodeCheckShowObj(SShowObj *pShow) {
|
static bool mnodeAccquireShowObj(SShowObj *pShow) {
|
||||||
SShowObj *pSaved = taosCacheAcquireByData(tsQhandleCache, pShow);
|
char key[10];
|
||||||
|
sprintf(key, "%d", pShow->index);
|
||||||
|
|
||||||
|
SShowObj *pSaved = taosCacheAcquireByName(tsMnodeShowCache, key);
|
||||||
if (pSaved == pShow) {
|
if (pSaved == pShow) {
|
||||||
|
mTrace("%p, show is accquired from cache", pShow);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
mTrace("show:%p, is already released", pShow);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) {
|
static void *mnodePutShowObj(SShowObj *pShow, int32_t size) {
|
||||||
if (tsQhandleCache != NULL) {
|
if (tsMnodeShowCache != NULL) {
|
||||||
char key[24];
|
char key[10];
|
||||||
sprintf(key, "show:%p", pShow);
|
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
|
||||||
SShowObj *newQhandle = taosCachePut(tsQhandleCache, key, pShow, size, 60);
|
sprintf(key, "%d", pShow->index);
|
||||||
|
|
||||||
|
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 60);
|
||||||
free(pShow);
|
free(pShow);
|
||||||
|
|
||||||
mTrace("show:%p, is saved", newQhandle);
|
mTrace("%p, show is put into cache", newQhandle);
|
||||||
return newQhandle;
|
return newQhandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,10 +351,10 @@ static void *mnodeSaveShowObj(SShowObj *pShow, int32_t size) {
|
||||||
static void mnodeFreeShowObj(void *data) {
|
static void mnodeFreeShowObj(void *data) {
|
||||||
SShowObj *pShow = data;
|
SShowObj *pShow = data;
|
||||||
sdbFreeIter(pShow->pIter);
|
sdbFreeIter(pShow->pIter);
|
||||||
mTrace("show:%p, is destroyed", pShow);
|
mTrace("%p, show is destroyed", pShow);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mnodeCleanupShowObj(void *pShow, bool forceRemove) {
|
static void mnodeReleaseShowObj(void *pShow, bool forceRemove) {
|
||||||
mTrace("show:%p, is released, force:%s", pShow, forceRemove ? "true" : "false");
|
mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false");
|
||||||
taosCacheRelease(tsQhandleCache, &pShow, forceRemove);
|
taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue