fix more issue
This commit is contained in:
parent
2268a1e94e
commit
7e174e347e
|
@ -693,6 +693,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
if (iCol >= pSchema->nCols) break;
|
||||
pColumn = &pSchema->pSchema[iCol];
|
||||
|
||||
ASSERT(pAlterTbReq->colName);
|
||||
if (strcmp(pColumn->name, pAlterTbReq->colName) == 0) break;
|
||||
iCol++;
|
||||
}
|
||||
|
|
|
@ -961,7 +961,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|||
nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname);
|
||||
taosRemoveFile(fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->pHeadF);
|
||||
}
|
||||
|
||||
|
@ -1114,7 +1114,7 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
ASSERT(nRef >= 0);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
taosRemoveFile(fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pHeadF);
|
||||
}
|
||||
|
||||
|
|
|
@ -843,7 +843,8 @@ _err:
|
|||
// SDataFReader ====================================================
|
||||
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
|
||||
int32_t code = 0;
|
||||
SDataFReader *pReader;
|
||||
int32_t lino = 0;
|
||||
SDataFReader *pReader = NULL;
|
||||
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
|
@ -851,7 +852,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
|||
pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
|
||||
if (pReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pReader->pTsdb = pTsdb;
|
||||
pReader->pSet = pSet;
|
||||
|
@ -859,31 +860,40 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
|||
// head
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// data
|
||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// sma
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// stt
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
*ppReader = pReader;
|
||||
return code;
|
||||
_exit:
|
||||
if (code) {
|
||||
*ppReader = NULL;
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb data file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
*ppReader = NULL;
|
||||
if (pReader) {
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
|
||||
tsdbCloseFile(&pReader->pSmaFD);
|
||||
tsdbCloseFile(&pReader->pDataFD);
|
||||
tsdbCloseFile(&pReader->pHeadFD);
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
} else {
|
||||
*ppReader = pReader;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -445,13 +445,14 @@ _err:
|
|||
|
||||
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
STsdbSnapReader* pReader = NULL;
|
||||
|
||||
// alloc
|
||||
pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
||||
if (pReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pReader->pTsdb = pTsdb;
|
||||
pReader->sver = sver;
|
||||
|
@ -461,19 +462,19 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
|
|||
code = taosThreadRwlockRdlock(&pTsdb->rwLock);
|
||||
if (code) {
|
||||
code = TAOS_SYSTEM_ERROR(code);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbFSRef(pTsdb, &pReader->fs);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
if (code) {
|
||||
code = TAOS_SYSTEM_ERROR(code);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// data
|
||||
|
@ -485,43 +486,52 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
|
|||
pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||
if (pIter->aBlockIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
} else {
|
||||
pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||
if (pIter->aSttBlk == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
code = tBlockDataCreate(&pIter->bData);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tBlockDataCreate(&pReader->bData);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// del
|
||||
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
|
||||
if (pReader->aDelIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
|
||||
if (pReader->aDelData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||
*ppReader = pReader;
|
||||
return code;
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), lino, tstrerror(code),
|
||||
pTsdb->path);
|
||||
*ppReader = NULL;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, vnode snapshot tsdb reader open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||
tstrerror(code));
|
||||
*ppReader = NULL;
|
||||
if (pReader) {
|
||||
taosArrayDestroy(pReader->aDelData);
|
||||
taosArrayDestroy(pReader->aDelIdx);
|
||||
tBlockDataDestroy(&pReader->bData, 1);
|
||||
tsdbFSDestroy(&pReader->fs);
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
} else {
|
||||
*ppReader = pReader;
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,14 +43,23 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||
tStartDecode(&dc);
|
||||
|
||||
tDecodeI32v(&dc, &nReqs);
|
||||
if (tDecodeI32v(&dc, &nReqs) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
||||
tb_uid_t uid = tGenIdPI64();
|
||||
char *name = NULL;
|
||||
tStartDecode(&dc);
|
||||
|
||||
tDecodeI32v(&dc, NULL);
|
||||
tDecodeCStr(&dc, &name);
|
||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
if (tDecodeCStr(&dc, &name) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
*(int64_t *)(dc.data + dc.pos) = uid;
|
||||
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
|
||||
|
||||
|
@ -81,10 +90,19 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
char *name = NULL;
|
||||
|
||||
tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
|
||||
tStartDecode(&dc);
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
tDecodeI32v(&dc, NULL);
|
||||
tDecodeCStr(&dc, &name);
|
||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
if (tDecodeCStr(&dc, &name) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
|
||||
uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
|
||||
if (uid == 0) {
|
||||
|
|
Loading…
Reference in New Issue