Fix several bug
This commit is contained in:
parent
1b1b71dd24
commit
6e48ca27d3
|
@ -230,15 +230,32 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
|
||||||
int *pNumOfPoints, TSKEY now) {
|
int *pNumOfPoints, TSKEY now) {
|
||||||
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
|
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
|
||||||
SVnodeObj * pVnode = vnodeList + pObj->vnode;
|
SVnodeObj * pVnode = vnodeList + pObj->vnode;
|
||||||
int rows;
|
int rows = 0;
|
||||||
char * payload;
|
char * payload = NULL;
|
||||||
int code = TSDB_CODE_ACTION_IN_PROGRESS;
|
int code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
|
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
|
||||||
SShellObj * pShell = (SShellObj *)param;
|
SShellObj * pShell = (SShellObj *)param;
|
||||||
int pointsImported = 0;
|
int pointsImported = 0;
|
||||||
TSKEY minKey, maxKey;
|
TSKEY firstKey, lastKey;
|
||||||
|
|
||||||
|
payload = pSubmit->payLoad;
|
||||||
|
firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
|
||||||
|
|
||||||
|
if (firstKey > pObj->lastKey) { // Just call insert
|
||||||
|
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
|
||||||
|
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
|
||||||
|
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, &pointsImported, now);
|
||||||
|
|
||||||
|
if (pShell) {
|
||||||
|
pShell->code = code;
|
||||||
|
pShell->numOfTotalPoints += pointsImported;
|
||||||
|
}
|
||||||
|
|
||||||
|
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
|
||||||
|
} else { // trigger import
|
||||||
|
{
|
||||||
rows = htons(pSubmit->numOfRows);
|
rows = htons(pSubmit->numOfRows);
|
||||||
|
assert(rows > 0);
|
||||||
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
|
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
|
||||||
if (expectedLen != contLen) {
|
if (expectedLen != contLen) {
|
||||||
dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
|
dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
|
||||||
|
@ -246,7 +263,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
|
||||||
return TSDB_CODE_WRONG_MSG_SIZE;
|
return TSDB_CODE_WRONG_MSG_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: check sversion here should not be here (Take import convert to insert case into consideration)
|
|
||||||
if (sversion != pObj->sversion) {
|
if (sversion != pObj->sversion) {
|
||||||
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
|
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
|
||||||
pObj->sversion, sversion);
|
pObj->sversion, sversion);
|
||||||
|
@ -254,9 +270,8 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check timestamp context.
|
// Check timestamp context.
|
||||||
payload = pSubmit->payLoad;
|
TSKEY minKey = 0, maxKey = 0;
|
||||||
TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
|
lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
|
||||||
TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
|
|
||||||
assert(firstKey <= lastKey);
|
assert(firstKey <= lastKey);
|
||||||
vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey);
|
vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey);
|
||||||
if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) {
|
if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) {
|
||||||
|
@ -267,25 +282,14 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
|
||||||
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
|
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: Commit log here is invalid (Take retry into consideration)
|
// TODO: Retry here will cause duplicate commit log written
|
||||||
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
|
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
|
||||||
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
|
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
|
||||||
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
|
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
|
||||||
if (code != 0) return code;
|
if (code != 0) return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (firstKey > pObj->lastKey) { // Just call insert
|
|
||||||
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
|
|
||||||
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
|
|
||||||
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now);
|
|
||||||
|
|
||||||
if (pShell) {
|
|
||||||
pShell->code = code;
|
|
||||||
pShell->numOfTotalPoints += pointsImported;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
|
|
||||||
} else { // trigger import
|
|
||||||
SImportInfo *pNew, import;
|
SImportInfo *pNew, import;
|
||||||
|
|
||||||
dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld",
|
dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld",
|
||||||
|
@ -1343,7 +1347,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
|
||||||
int code = -1;
|
int code = -1;
|
||||||
SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache);
|
SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache);
|
||||||
int payloadIter;
|
int payloadIter;
|
||||||
SCachePool * pPool = pVnode->pCachePool;
|
SCachePool * pPool = (SCachePool *)(pVnode->pCachePool);
|
||||||
int isCacheIterEnd = 0;
|
int isCacheIterEnd = 0;
|
||||||
int spayloadIter = 0;
|
int spayloadIter = 0;
|
||||||
int isAppendData = 0;
|
int isAppendData = 0;
|
||||||
|
@ -1369,7 +1373,10 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
|
||||||
|
|
||||||
if (pInfo->numOfBlocks == 0) {
|
if (pInfo->numOfBlocks == 0) {
|
||||||
if (vnodeAllocateCacheBlock(pObj) < 0) {
|
if (vnodeAllocateCacheBlock(pObj) < 0) {
|
||||||
// TODO: deal with the ERROR here
|
pImport->importedRows = 0;
|
||||||
|
pImport->commit = 1;
|
||||||
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue