diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 270f02b995..5924caf6f4 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -166,7 +166,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi int code = TSDB_CODE_SUCCESS; SCachePool *pPool = (SCachePool *)(pVnode->pCachePool); SShellObj * pShell = (SShellObj *)param; - int pointsImported = 0; TSKEY firstKey, lastKey; payload = pSubmit->payLoad; @@ -200,25 +199,29 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi if (code != 0) return code; } + /* + * The timestamp of all records in a submit payload are always in ascending order, guaranteed by client, so here only + * the first key. + */ if (firstKey > pObj->lastKey) { // Just call insert - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - // TODO: Here may fail to set the state, add error handling. - vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now); - // TODO: outside clear state function is invalid for this structure - vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); } else { // trigger import if (sversion != pObj->sversion) { dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, pObj->sversion, sversion); return TSDB_CODE_OTHERS; } - - SImportInfo import; + + // check the table status for perform import historical data + if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) { + return code; + } + + SImportInfo import = {0}; dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld", pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey); - memset(&import, 0, sizeof(import)); + import.firstKey = firstKey; import.lastKey = lastKey; import.pObj = pObj; @@ -226,8 +229,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi import.payload = payload; import.rows = rows; - // FIXME: mutex here seems meaningless and num here still can - // be changed + // FIXME: mutex here seems meaningless and num here still can be changed int32_t num = 0; pthread_mutex_lock(&pVnode->vmutex); num = pObj->numOfQueries; @@ -236,10 +238,12 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi int32_t commitInProcess = 0; pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || - num > 0) { // mutual exclusion with read (need to change here) + if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { + // mutual exclusion with read (need to change here) pthread_mutex_unlock(&pPool->vmutex); + vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); return TSDB_CODE_ACTION_IN_PROGRESS; + } else { pPool->commitInProcess = 1; pthread_mutex_unlock(&pPool->vmutex); @@ -248,7 +252,8 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi } pVnode->version++; } - + + vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); return code; } diff --git a/src/system/detail/src/vnodeMeter.c b/src/system/detail/src/vnodeMeter.c index f5619d72b8..77bafd50d4 100644 --- a/src/system/detail/src/vnodeMeter.c +++ b/src/system/detail/src/vnodeMeter.c @@ -584,12 +584,12 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_INSERT, cont, contLen, sversion); - if (code != 0) return code; + if (code != TSDB_CODE_SUCCESS) return code; } if (source == TSDB_DATA_SOURCE_SHELL && pVnode->cfg.replications > 1) { code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_INSERT, sversion); - if (code != 0) return code; + if (code != TSDB_CODE_SUCCESS) return code; } if (pObj->sversion < sversion) { @@ -601,11 +601,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi } pData = pSubmit->payLoad; - code = TSDB_CODE_SUCCESS; TSKEY firstKey = *((TSKEY *)pData); TSKEY lastKey = *((TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1))); int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; + TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision]; TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 2; if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) { @@ -619,7 +619,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi } for (i = 0; i < numOfPoints; ++i) { // meter will be dropped, abort current insertion - if (pObj->state >= TSDB_METER_STATE_DELETING) { + if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) { dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId, pObj->state); @@ -648,6 +648,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi pData += pObj->bytesPerPoint; points++; } + atomic_fetch_add_64(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1)); atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint); @@ -660,6 +661,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi pVnode->version++; pthread_mutex_unlock(&(pVnode->vmutex)); + vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); _over: diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 0312fb07d6..164efb1198 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -509,7 +509,7 @@ static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *p SMeterObj *pMeterObj = pVnode->meterList[sid]; if (pMeterObj == NULL) { - dError("vid:%d sid:%d, no active table", pVnode->vnode, sid); + dError("vid:%d sid:%d, not active table", pVnode->vnode, sid); vnodeSendMeterCfgMsg(pVnode->vnode, sid); return TSDB_CODE_NOT_ACTIVE_TABLE; } @@ -581,41 +581,27 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { if (code != TSDB_CODE_SUCCESS) break; SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]); + // dont include sid, vid int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; int32_t sversion = htonl(pBlocks->sversion); - int32_t state = TSDB_METER_STATE_READY; - state = vnodeSetMeterState(pMeterObj, (pSubmit->import ? TSDB_METER_STATE_IMPORTING : TSDB_METER_STATE_INSERT)); - - if (state == TSDB_METER_STATE_READY) { // meter status is ready for insert/import - if (pSubmit->import) { - code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, - sversion, &numOfPoints, now); - vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); - pObj->numOfTotalPoints += numOfPoints; - if (code == TSDB_CODE_SUCCESS) pObj->count--; - } else { - code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, - sversion, &numOfPoints, now); - vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT); - numOfTotalPoints += numOfPoints; + if (pSubmit->import) { + code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, + sversion, &numOfPoints, now); + pObj->numOfTotalPoints += numOfPoints; + + //records for one table should be consecutive located in the payload buffer, which is guaranteed by client + if (code == TSDB_CODE_SUCCESS) { + pObj->count--; } - if (code != TSDB_CODE_SUCCESS) break; } else { - if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) { - dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, - pMeterObj->state); - code = TSDB_CODE_NOT_ACTIVE_TABLE; - break; - } else {// waiting for 300ms by default and try again - dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid, - pMeterObj->meterId, pMeterObj->state); - - code = TSDB_CODE_ACTION_IN_PROGRESS; - break; - } + code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, + sversion, &numOfPoints, now); + numOfTotalPoints += numOfPoints; } + + if (code != TSDB_CODE_SUCCESS) break; pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint); @@ -635,7 +621,7 @@ _submit_over: pImportInfo->import = 1; pImportInfo->vnode = pSubmit->vnode; pImportInfo->numOfSid = pSubmit->numOfSid; - pImportInfo->ssid = i; + pImportInfo->ssid = i; // start from this position, not the initial position pImportInfo->pObj = pObj; pImportInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg)); assert(pImportInfo->offset >= 0); @@ -658,7 +644,7 @@ static void vnodeProcessBatchImportTimer(void *param, void *tmrId) { SBatchImportInfo *pImportInfo = (SBatchImportInfo *)param; assert(pImportInfo != NULL && pImportInfo->import); - int32_t i = 0, numOfPoints = 0, numOfTotalPoints = 0; + int32_t i = 0, numOfPoints = 0; int32_t code = TSDB_CODE_SUCCESS; SShellObj * pShell = pImportInfo->pObj; @@ -677,30 +663,11 @@ static void vnodeProcessBatchImportTimer(void *param, void *tmrId) { int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; int32_t sversion = htonl(pBlocks->sversion); - int32_t state = TSDB_METER_STATE_READY; - state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); - - if (state == TSDB_METER_STATE_READY) { // meter status is ready for insert/import - code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pShell, - sversion, &numOfPoints, now); - vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); - pShell->numOfTotalPoints += numOfPoints; - if (code != TSDB_CODE_SUCCESS) break; - pShell->count--; - } else { - if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) { - dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, - pMeterObj->state); - code = TSDB_CODE_NOT_ACTIVE_TABLE; - break; - } else { // waiting for 300ms by default and try again - dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid, - pMeterObj->meterId, pMeterObj->state); - - code = TSDB_CODE_ACTION_IN_PROGRESS; - break; - } - } + code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pShell, + sversion, &numOfPoints, now); + pShell->numOfTotalPoints += numOfPoints; + if (code != TSDB_CODE_SUCCESS) break; + pShell->count--; pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);