diff --git a/src/main/java/org/wikitolearn/wikirating/exception/PreviousProcessOngoingException.java b/src/main/java/org/wikitolearn/wikirating/exception/PreviousProcessOngoingException.java new file mode 100644 index 0000000..84955c8 --- /dev/null +++ b/src/main/java/org/wikitolearn/wikirating/exception/PreviousProcessOngoingException.java @@ -0,0 +1,21 @@ +/** + * + */ +package org.wikitolearn.wikirating.exception; + +/** + * @author aletundo + * + */ +public class PreviousProcessOngoingException extends RuntimeException { + + private static final long serialVersionUID = -4938271048372946378L; + + public PreviousProcessOngoingException(){ + super("Cannot create new Process: the previous one is still ONGOING"); + } + + public PreviousProcessOngoingException(String message){ + super(message); + } +} diff --git a/src/main/java/org/wikitolearn/wikirating/service/MaintenanceService.java b/src/main/java/org/wikitolearn/wikirating/service/MaintenanceService.java index 7f8dc69..6ea7155 100644 --- a/src/main/java/org/wikitolearn/wikirating/service/MaintenanceService.java +++ b/src/main/java/org/wikitolearn/wikirating/service/MaintenanceService.java @@ -1,231 +1,233 @@ package org.wikitolearn.wikirating.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.wikitolearn.wikirating.exception.GetPagesUpdateInfoException; -import org.wikitolearn.wikirating.exception.TemporaryVoteValidationException; -import org.wikitolearn.wikirating.exception.UpdateGraphException; -import org.wikitolearn.wikirating.exception.UpdatePagesAndRevisionsException; -import org.wikitolearn.wikirating.exception.UpdateUsersException; +import org.wikitolearn.wikirating.exception.*; import org.wikitolearn.wikirating.model.Process; import org.wikitolearn.wikirating.model.Revision; import org.wikitolearn.wikirating.model.UpdateInfo; import org.wikitolearn.wikirating.repository.TemporaryVoteRepository; import org.wikitolearn.wikirating.service.mediawiki.UpdateMediaWikiService; import org.wikitolearn.wikirating.util.enums.ProcessStatus; import org.wikitolearn.wikirating.util.enums.ProcessType; //import org.wikitolearn.wikirating.util.enums.UpdateType; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author aletundo * @author valsdav */ @Service public class MaintenanceService { private static final Logger LOG = LoggerFactory.getLogger(MaintenanceService.class); @Autowired private UserService userService; @Autowired private PageService pageService; @Autowired private RevisionService revisionService; @Autowired private MetadataService metadataService; @Autowired private ProcessService processService; @Autowired private VoteService voteService; @Autowired private UpdateMediaWikiService updateMediaWikiService; @Value("#{'${mediawiki.langs}'.split(',')}") private List langs; @Value("${mediawiki.protocol}") private String protocol; @Value("${mediawiki.api.url}") private String apiUrl; @Value("${mediawiki.namespace}") private String namespace; /** * Initialize the graph for the first time using parallel threads for each domain language * @return true if initialization succeed */ public boolean initializeGraph() { // Initialize Metadata service metadataService.initMetadata(); // Start a new Process Process initProcess = processService.addProcess(ProcessType.INIT); metadataService.addFirstProcess(initProcess); CompletableFuture initFuture = CompletableFuture .allOf(buildUsersAndPagesFutersList().toArray(new CompletableFuture[langs.size() + 1])) .thenCompose(result -> CompletableFuture .allOf(buildRevisionsFuturesList().toArray(new CompletableFuture[langs.size()]))) .thenCompose(result -> CompletableFuture .allOf(buildApplyCourseStructureFuturesList().toArray(new CompletableFuture[langs.size()]))) .thenCompose(result -> userService.initAuthorship()); try { boolean result = initFuture.get(); // Save the result of the process if (result){ processService.closeCurrentProcess(ProcessStatus.DONE); }else{ processService.closeCurrentProcess(ProcessStatus.EXCEPTION); } return result; } catch (InterruptedException | ExecutionException e) { LOG.error("Something went wrong. {}", e.getMessage()); return false; } } /** * Build a list of CompletableFuture. The elements are the fetches of pages' * revisions from each domain language. * * @return a list of CompletableFuture */ private List> buildApplyCourseStructureFuturesList() { List> parallelApplyCourseStructureFutures = new ArrayList<>(); // Add course structure for each domain language for (String lang : langs) { String url = protocol + lang + "." + apiUrl; parallelApplyCourseStructureFutures.add(pageService.applyCourseStructure(url, lang)); } return parallelApplyCourseStructureFutures; } /** * Build a list of CompletableFuture. The elements are the fetches of pages' * revisions from each domain language. * * @return a list of CompletableFuture */ private List> buildRevisionsFuturesList() { List> parallelRevisionsFutures = new ArrayList<>(); // Add revisions fetch for each domain language for (String lang : langs) { String url = protocol + lang + "." + apiUrl; parallelRevisionsFutures.add(revisionService.initRevisions(lang, url)); } return parallelRevisionsFutures; } /** * Build a list of CompletableFuture. The first one is the fetch of the * users from the first domain in mediawiki.langs list. The rest of the * elements are the fetches of the pages for each language. This * implementation assumes that the users are shared among domains. * * @return a list of CompletableFuture */ private List> buildUsersAndPagesFutersList() { List> usersAndPagesInsertion = new ArrayList<>(); // Add users fetch as fist operation usersAndPagesInsertion.add(userService.initUsers(protocol + langs.get(0) + "." + apiUrl)); // Add pages fetch for each domain language for (String lang : langs) { String url = protocol + lang + "." + apiUrl; usersAndPagesInsertion.add(pageService.initPages(lang, url)); } return usersAndPagesInsertion; } /** * Entry point for the scheduled graph updated * @return true if the update succeed */ @Scheduled(cron = "${maintenance.update.cron}") public void updateGraph() { // Get start timestamp of the latest FETCH Process before opening a new process Date startTimestampLatestFetch = processService.getLastProcessStartDateByType(ProcessType.FETCH); if(startTimestampLatestFetch == null){ startTimestampLatestFetch = processService.getLastProcessStartDateByType(ProcessType.INIT); } // Create a new FETCH process - Process currentFetchProcess = processService.addProcess(ProcessType.FETCH); - metadataService.updateLatestProcess(); + Process currentFetchProcess = null; + try { + currentFetchProcess = processService.addProcess(ProcessType.FETCH); + metadataService.updateLatestProcess(); + } catch (PreviousProcessOngoingException e){ + LOG.error("Cannot start Update process because the previous process is still ONGOING. Waiting next turn."); + return; + } Date startTimestampCurrentFetch = currentFetchProcess.getStartOfProcess(); try { userService.updateUsers(protocol + langs.get(0) + "." + apiUrl, startTimestampLatestFetch, startTimestampCurrentFetch); updatePagesAndRevisions(startTimestampLatestFetch, startTimestampCurrentFetch); /*for (String lang : langs) { String url = protocol + lang + "." + apiUrl; pageService.applyCourseStructure(url, lang); }*/ voteService.validateTemporaryVotes(startTimestampCurrentFetch); // Save the result of the process, closing the current one processService.closeCurrentProcess(ProcessStatus.DONE); } catch (TemporaryVoteValidationException | UpdateUsersException | UpdatePagesAndRevisionsException e) { processService.closeCurrentProcess(ProcessStatus.EXCEPTION); LOG.error("An error occurred during a scheduled graph update procedure"); throw new UpdateGraphException(); } } /** * Update the pages and the revisions querying the MediaWiki API * @param start * @param end */ private void updatePagesAndRevisions(Date start, Date end) throws UpdatePagesAndRevisionsException{ try{ // First of all, get the RecentChangeEvents from MediaWiki API for (String lang : langs) { String url = protocol + lang + "." + apiUrl; // Fetching pages updates List updates = updateMediaWikiService.getPagesUpdateInfo(url, namespace, start, end); for(UpdateInfo update : updates){ switch (update.getType()) { case "new": // Create the new revision Revision newRev = revisionService.addRevision(update.getRevid(), lang, update.getUserid(), update.getOld_revid(), update.getNewlen(), update.getTimestamp()); // Then create a new Page and link it with the revision pageService.addPage(update.getPageid(), update.getTitle(), lang, newRev); userService.setAuthorship(newRev); break; case "edit": // Create a new revision Revision updateRev = revisionService.addRevision(update.getRevid(), lang, update.getUserid(), update.getOld_revid(), update.getNewlen(), update.getTimestamp()); // Then add it to the page pageService.addRevisionToPage(lang + "_" + update.getPageid(), updateRev); userService.setAuthorship(updateRev); break; case "move": // Move the page to the new title pageService.movePage(update.getTitle(), update.getNewTitle(), lang); break; case "delete": // Delete the page and all its revisions pageService.deletePage(update.getTitle(), lang); break; default: break; } } } }catch(GetPagesUpdateInfoException e){ LOG.error("An error occurred while updating pages and revisions: {}", e.getMessage()); throw new UpdatePagesAndRevisionsException(); } } } diff --git a/src/main/java/org/wikitolearn/wikirating/service/ProcessService.java b/src/main/java/org/wikitolearn/wikirating/service/ProcessService.java index aa00621..c90ed14 100644 --- a/src/main/java/org/wikitolearn/wikirating/service/ProcessService.java +++ b/src/main/java/org/wikitolearn/wikirating/service/ProcessService.java @@ -1,88 +1,96 @@ package org.wikitolearn.wikirating.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.wikitolearn.wikirating.exception.AddProcessException; +import org.wikitolearn.wikirating.exception.PreviousProcessOngoingException; import org.wikitolearn.wikirating.model.Process; import org.wikitolearn.wikirating.repository.ProcessRepository; import org.wikitolearn.wikirating.util.enums.ProcessStatus; import org.wikitolearn.wikirating.util.enums.ProcessType; import java.util.Date; /** * Created by valsdav on 29/03/17. */ @Service public class ProcessService { private static final Logger LOG = LoggerFactory.getLogger(ProcessService.class); @Autowired private ProcessRepository processRepository; /** * This method creates a new process of the specified * type and adds it on the top of the processes chain. * @param type Type of process requested * @return returns the created process * @throws AddProcessException */ - public Process addProcess(ProcessType type) throws AddProcessException{ - try{ - Process process = new Process(type); - Process previousProcess = processRepository.getLatestProcess(); - if (previousProcess != null) { + public Process addProcess(ProcessType type) throws AddProcessException, PreviousProcessOngoingException{ + try { + Process process = new Process(type); + Process previousProcess = processRepository.getLatestProcess(); + if (previousProcess != null) { + // Check if the process is still ONGOING + if (previousProcess.getProcessStatus() == ProcessStatus.ONGOING) { + LOG.error("Cannot create new Process: the previous one is still ONGOING"); + throw new PreviousProcessOngoingException(); + } process.setPreviousProcess(previousProcess); } processRepository.save(process); LOG.info("Created new process: {}", process.toString()); return process; + }catch (PreviousProcessOngoingException p){ + throw p; }catch(Exception e){ LOG.error("An error occurred during process creation: {}", e.getMessage()); throw new AddProcessException(); } } /** * This method modify the status of the last opened process * and saves it. * @param status Final status of the process * @return returns the closed process */ public Process closeCurrentProcess(ProcessStatus status){ Process currentProcess = processRepository.getOnGoingProcess(); currentProcess.setProcessStatus(status); currentProcess.setEndOfProcess(new Date()); processRepository.save(currentProcess); LOG.info("Update the status of the latest process: {}", currentProcess.toString()); return currentProcess; } /** * Get the start date of the latest process * @return the start date of the latest process */ public Date getLastProcessStartDate(){ Process latestProcess = processRepository.getLatestProcess(); if(latestProcess != null){ return latestProcess.getStartOfProcess(); } return null; } /** * Get the start date of the latest process of the specified type * @param type the process type requested * @return the start date of the latest process of the specified type */ public Date getLastProcessStartDateByType(ProcessType type){ Process latestProcess = processRepository.getLatestProcessByType(type); if(latestProcess != null){ return latestProcess.getStartOfProcess(); } return null; } }