Merge pull request #2802 from taosdata/feature/syncFC
processedCount to prevent race condition
This commit is contained in:
commit
23eeaa8c7e
|
@ -38,6 +38,8 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRspRet rspRet;
|
SRspRet rspRet;
|
||||||
|
int32_t processedCount;
|
||||||
|
int32_t code;
|
||||||
void *pCont;
|
void *pCont;
|
||||||
int32_t contLen;
|
int32_t contLen;
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -187,13 +189,16 @@ void dnodeFreeVnodeWqueue(void *wqueue) {
|
||||||
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
|
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
|
||||||
SWriteMsg *pWrite = (SWriteMsg *)param;
|
SWriteMsg *pWrite = (SWriteMsg *)param;
|
||||||
|
|
||||||
if (code > 0) return;
|
if (code < 0) pWrite->code = code;
|
||||||
|
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
|
||||||
|
|
||||||
|
if (count <= 1) return;
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
.handle = pWrite->rpcMsg.handle,
|
.handle = pWrite->rpcMsg.handle,
|
||||||
.pCont = pWrite->rspRet.rsp,
|
.pCont = pWrite->rspRet.rsp,
|
||||||
.contLen = pWrite->rspRet.len,
|
.contLen = pWrite->rspRet.len,
|
||||||
.code = code,
|
.code = pWrite->code,
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
|
@ -239,7 +244,10 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
|
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
|
||||||
if (pWrite) pWrite->rpcMsg.code = code;
|
if (pWrite) {
|
||||||
|
pWrite->rpcMsg.code = code;
|
||||||
|
if (code <= 0) pWrite->processedCount = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
walFsync(vnodeGetWal(pVnode));
|
walFsync(vnodeGetWal(pVnode));
|
||||||
|
|
|
@ -69,8 +69,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
||||||
|
|
||||||
if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) {
|
if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) {
|
||||||
if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) {
|
if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) {
|
||||||
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]),
|
||||||
TABLE_TID(pTable), TABLE_UID(pTable));
|
TABLE_TID(pMeta->tables[tid]), TABLE_UID(pMeta->tables[tid]));
|
||||||
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
||||||
} else {
|
} else {
|
||||||
tsdbError("vgId:%d table %s at tid %d uid %" PRIu64
|
tsdbError("vgId:%d table %s at tid %d uid %" PRIu64
|
||||||
|
@ -1295,4 +1295,4 @@ static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) {
|
||||||
tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables);
|
tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue