[TD-771] refCount while process sdbqueue
This commit is contained in:
parent
a1ebea86a5
commit
3b2b281045
|
@ -551,16 +551,20 @@ static int sdbWrite(void *param, void *data, int type) {
|
|||
|
||||
// from app, oper is created
|
||||
if (pOper != NULL) {
|
||||
sdbTrace("record from app is disposed, version:%" PRIu64 " result:%s", pHead->version, tstrerror(code));
|
||||
sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s",
|
||||
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
// from wal or forward msg, oper not created, should add into hash
|
||||
if (tsSdbObj.sync != NULL) {
|
||||
sdbTrace("record from wal forward is disposed, version:%" PRIu64 " confirm it", pHead->version);
|
||||
sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it",
|
||||
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
|
||||
} else {
|
||||
sdbTrace("record from wal restore is disposed, version:%" PRIu64 , pHead->version);
|
||||
sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||
}
|
||||
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
|
@ -626,9 +630,11 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
|
|||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
|
||||
sdbTrace("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
|
||||
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
|
||||
}
|
||||
|
||||
sdbIncRef(pNewOper->table, pNewOper->pObj);
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -674,9 +680,11 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
|
|||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
|
||||
sdbTrace("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
|
||||
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
|
||||
}
|
||||
|
||||
sdbIncRef(pNewOper->table, pNewOper->pObj);
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -722,9 +730,11 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
|
|||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
|
||||
sdbTrace("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
|
||||
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
|
||||
}
|
||||
|
||||
sdbIncRef(pNewOper->table, pNewOper->pObj);
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -943,7 +953,9 @@ static void *sdbWorkerFp(void *param) {
|
|||
}
|
||||
|
||||
if (pOper != NULL && pOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, will be processed in sdb queue", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg);
|
||||
sdbTrace("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
|
||||
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
|
||||
sdbGetKeyStr(pOper->table, pOper->pObj), pHead->version);
|
||||
}
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, type);
|
||||
|
@ -959,15 +971,20 @@ static void *sdbWorkerFp(void *param) {
|
|||
if (type == TAOS_QTYPE_RPC) {
|
||||
pOper = (SSdbOper *)item;
|
||||
if (pOper != NULL && pOper->cb != NULL) {
|
||||
sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i);
|
||||
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
|
||||
}
|
||||
|
||||
|
||||
if (pOper != NULL && pOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg,
|
||||
tstrerror(pOper->retCode));
|
||||
}
|
||||
|
||||
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
|
||||
|
||||
if (pOper != NULL) {
|
||||
sdbDecRef(pOper->table, pOper->pObj);
|
||||
}
|
||||
}
|
||||
taosFreeQitem(item);
|
||||
}
|
||||
|
|
|
@ -1420,7 +1420,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
|||
continue;
|
||||
}
|
||||
if (pTable->vgHash == NULL) {
|
||||
mError("app:%p:%p, stable:%s, not vgroup exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg,
|
||||
mError("app:%p:%p, stable:%s, no vgroup exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg,
|
||||
stableName);
|
||||
mnodeDecTableRef(pTable);
|
||||
|
||||
|
|
|
@ -11,8 +11,9 @@ set -e
|
|||
|
||||
CMD_NAME=
|
||||
LOOP_TIMES=5
|
||||
SLEEP_TIME=0
|
||||
|
||||
while getopts "f:t:" arg
|
||||
while getopts "f:t:s:" arg
|
||||
do
|
||||
case $arg in
|
||||
f)
|
||||
|
@ -21,6 +22,9 @@ do
|
|||
t)
|
||||
LOOP_TIMES=$OPTARG
|
||||
;;
|
||||
s)
|
||||
SLEEP_TIME=$OPTARG
|
||||
;;
|
||||
?)
|
||||
echo "unknow argument"
|
||||
;;
|
||||
|
@ -29,6 +33,7 @@ done
|
|||
|
||||
echo LOOP_TIMES ${LOOP_TIMES}
|
||||
echo CMD_NAME ${CMD_NAME}
|
||||
echo SLEEP_TIME ${SLEEP_TIME}
|
||||
|
||||
GREEN='\033[1;32m'
|
||||
GREEN_DARK='\033[0;32m'
|
||||
|
@ -40,5 +45,5 @@ do
|
|||
echo -e $GREEN loop $i $NC
|
||||
echo -e $GREEN cmd $CMD_NAME $NC
|
||||
$CMD_NAME
|
||||
sleep 2
|
||||
sleep ${SLEEP_TIME}
|
||||
done
|
||||
|
|
Loading…
Reference in New Issue