diff --git a/src/main/java/org/icatproject/ids/DfInfoImpl.java b/src/main/java/org/icatproject/ids/DfInfoImpl.java deleted file mode 100644 index 214d6aba..00000000 --- a/src/main/java/org/icatproject/ids/DfInfoImpl.java +++ /dev/null @@ -1,89 +0,0 @@ -package org.icatproject.ids; - -import org.icatproject.ids.plugin.DfInfo; - -public class DfInfoImpl implements DfInfo, Comparable { - - private String createId; - - private long dfId; - - private String dfLocation; - - private String dfName; - - private long dsId; - - private String modId; - - public DfInfoImpl(long dfId, String dfName, String dfLocation, String createId, String modId, long dsId) { - this.dfId = dfId; - this.dfName = dfName; - this.dfLocation = dfLocation; - this.createId = createId; - this.modId = modId; - this.dsId = dsId; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj == null || obj.getClass() != this.getClass()) { - return false; - } - return dfId == ((DfInfoImpl) obj).getDfId(); - } - - @Override - public String getCreateId() { - return createId; - } - - @Override - public Long getDfId() { - return dfId; - } - - @Override - public String getDfLocation() { - return dfLocation; - } - - @Override - public String getDfName() { - return dfName; - } - - public long getDsId() { - return dsId; - } - - @Override - public String getModId() { - return modId; - } - - @Override - public int hashCode() { - return (int) (dfId ^ (dfId >>> 32)); - } - - @Override - public String toString() { - return dfLocation; - } - - @Override - public int compareTo(DfInfoImpl o) { - if (dfId > o.getDfId()) { - return 1; - } - if (dfId < o.getDfId()) { - return -1; - } - return 0; - } - -} diff --git a/src/main/java/org/icatproject/ids/DsInfoImpl.java b/src/main/java/org/icatproject/ids/DsInfoImpl.java deleted file mode 100644 index 08bd21d0..00000000 --- a/src/main/java/org/icatproject/ids/DsInfoImpl.java +++ /dev/null @@ -1,123 +0,0 @@ -package org.icatproject.ids; - -import org.icatproject.Dataset; -import org.icatproject.Facility; -import org.icatproject.Investigation; -import org.icatproject.ids.exceptions.InsufficientPrivilegesException; -import org.icatproject.ids.plugin.DsInfo; - -public class DsInfoImpl implements DsInfo { - - private long dsId; - - private String dsName; - - private long facilityId; - - private String facilityName; - - private long invId; - - private String invName; - - private String visitId; - - private String dsLocation; - - public DsInfoImpl(Dataset ds) throws InsufficientPrivilegesException { - Investigation investigation = ds.getInvestigation(); - if (investigation == null) { - throw new InsufficientPrivilegesException( - "Probably not able to read Investigation for dataset id " + ds.getId()); - } - Facility facility = investigation.getFacility(); - if (facility == null) { - throw new InsufficientPrivilegesException( - "Probably not able to read Facility for investigation id " - + investigation.getId()); - } - dsId = ds.getId(); - dsName = ds.getName(); - dsLocation = ds.getLocation(); - invId = investigation.getId(); - invName = investigation.getName(); - visitId = investigation.getVisitId(); - facilityId = facility.getId(); - facilityName = facility.getName(); - } - - public DsInfoImpl(long dsId, String dsName, String dsLocation, long invId, String invName, - String visitId, long facilityId, String facilityName) { - this.dsId = dsId; - this.dsName = dsName; - this.dsLocation = dsLocation; - this.invId = invId; - this.invName = invName; - this.visitId = visitId; - this.facilityId = facilityId; - this.facilityName = facilityName; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj == null || obj.getClass() != this.getClass()) { - return false; - } - return dsId == ((DsInfoImpl) obj).getDsId(); - } - - @Override - public Long getDsId() { - return dsId; - } - - @Override - public String getDsName() { - return dsName; - } - - @Override - public Long getFacilityId() { - return facilityId; - } - - @Override - public String getFacilityName() { - return facilityName; - } - - @Override - public Long getInvId() { - return invId; - } - - @Override - public String getInvName() { - return invName; - } - - @Override - public String getVisitId() { - return visitId; - } - - @Override - public int hashCode() { - return (int) (dsId ^ (dsId >>> 32)); - } - - @Override - public String toString() { - return invId + "/" + dsId + " (" + facilityName + "/" + invName + "/" + visitId + "/" - + dsName + ")"; - } - - @Override - public String getDsLocation() { - return dsLocation; - } - -} diff --git a/src/main/java/org/icatproject/ids/FiniteStateMachine.java b/src/main/java/org/icatproject/ids/FiniteStateMachine.java deleted file mode 100644 index 84894e33..00000000 --- a/src/main/java/org/icatproject/ids/FiniteStateMachine.java +++ /dev/null @@ -1,633 +0,0 @@ -package org.icatproject.ids; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import jakarta.ejb.DependsOn; -import jakarta.ejb.EJB; -import jakarta.ejb.Singleton; -import jakarta.json.Json; -import jakarta.json.stream.JsonGenerator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.icatproject.Dataset; -import org.icatproject.ids.LockManager.Lock; -import org.icatproject.ids.LockManager.LockInfo; -import org.icatproject.ids.LockManager.LockType; -import org.icatproject.ids.exceptions.InternalException; -import org.icatproject.ids.plugin.AlreadyLockedException; -import org.icatproject.ids.plugin.DfInfo; -import org.icatproject.ids.plugin.DsInfo; -import org.icatproject.ids.thread.DfArchiver; -import org.icatproject.ids.thread.DfDeleter; -import org.icatproject.ids.thread.DfRestorer; -import org.icatproject.ids.thread.DfWriter; -import org.icatproject.ids.thread.DsArchiver; -import org.icatproject.ids.thread.DsRestorer; -import org.icatproject.ids.thread.DsWriter; - -@Singleton -@DependsOn({"LockManager"}) -public class FiniteStateMachine { - - private class DfProcessQueue extends TimerTask { - - @Override - public void run() { - try { - synchronized (deferredDfOpsQueue) { - if (processOpsTime != null && System.currentTimeMillis() > processOpsTime && !deferredDfOpsQueue.isEmpty()) { - processOpsTime = null; - logger.debug("deferredDfOpsQueue has " + deferredDfOpsQueue.size() + " entries"); - List writes = new ArrayList<>(); - List archives = new ArrayList<>(); - List restores = new ArrayList<>(); - List deletes = new ArrayList<>(); - Map writeLocks = new HashMap<>(); - Map archiveLocks = new HashMap<>(); - Map restoreLocks = new HashMap<>(); - Map deleteLocks = new HashMap<>(); - - Map newOps = new HashMap<>(); - final Iterator> it = deferredDfOpsQueue.entrySet().iterator(); - while (it.hasNext()) { - Entry opEntry = it.next(); - DfInfoImpl dfInfo = opEntry.getKey(); - Long dsId = dfInfo.getDsId(); - DsInfo dsInfo; - try { - Dataset ds = (Dataset) reader.get("Dataset ds INCLUDE ds.investigation.facility", dsId); - dsInfo = new DsInfoImpl(ds); - } catch (Exception e) { - logger.error("Could not get dsInfo {}: {}.", dsId, e.getMessage()); - continue; - } - if (!dfChanging.containsKey(dfInfo)) { - final RequestedState state = opEntry.getValue(); - logger.debug(dfInfo + " " + state); - if (state == RequestedState.WRITE_REQUESTED) { - if (!writeLocks.containsKey(dsId)) { - try { - writeLocks.put(dsId, lockManager.lock(dsInfo, LockType.SHARED)); - } catch (AlreadyLockedException e) { - logger.debug("Could not acquire lock on " + dsId + ", hold back " + state); - continue; - } catch (IOException e) { - logger.error("I/O exception " + e.getMessage() + " locking " + dsId); - continue; - } - } - it.remove(); - dfChanging.put(dfInfo, state); - writes.add(dfInfo); - } else if (state == RequestedState.WRITE_THEN_ARCHIVE_REQUESTED) { - if (!writeLocks.containsKey(dsId)) { - try { - writeLocks.put(dsId, lockManager.lock(dsInfo, LockType.SHARED)); - } catch (AlreadyLockedException e) { - logger.debug("Could not acquire lock on " + dsId + ", hold back " + state); - continue; - } catch (IOException e) { - logger.error("I/O exception " + e.getMessage() + " locking " + dsId); - continue; - } - } - it.remove(); - dfChanging.put(dfInfo, RequestedState.WRITE_REQUESTED); - writes.add(dfInfo); - newOps.put(dfInfo, RequestedState.ARCHIVE_REQUESTED); - } else if (state == RequestedState.ARCHIVE_REQUESTED) { - if (!archiveLocks.containsKey(dsId)) { - try { - archiveLocks.put(dsId, lockManager.lock(dsInfo, LockType.EXCLUSIVE)); - } catch (AlreadyLockedException e) { - logger.debug("Could not acquire lock on " + dsId + ", hold back " + state); - continue; - } catch (IOException e) { - logger.error("I/O exception " + e.getMessage() + " locking " + dsId); - continue; - } - } - it.remove(); - dfChanging.put(dfInfo, state); - archives.add(dfInfo); - } else if (state == RequestedState.RESTORE_REQUESTED) { - if (!restoreLocks.containsKey(dsId)) { - try { - restoreLocks.put(dsId, lockManager.lock(dsInfo, LockType.EXCLUSIVE)); - } catch (AlreadyLockedException e) { - logger.debug("Could not acquire lock on " + dsId + ", hold back " + state); - continue; - } catch (IOException e) { - logger.error("I/O exception " + e.getMessage() + " locking " + dsId); - continue; - } - } - it.remove(); - dfChanging.put(dfInfo, state); - restores.add(dfInfo); - } else if (state == RequestedState.DELETE_REQUESTED) { - if (!deleteLocks.containsKey(dsId)) { - try { - deleteLocks.put(dsId, lockManager.lock(dsInfo, LockType.EXCLUSIVE)); - } catch (AlreadyLockedException e) { - logger.debug("Could not acquire lock on " + dsId + ", hold back " + state); - continue; - } catch (IOException e) { - logger.error("I/O exception " + e.getMessage() + " locking " + dsId); - continue; - } - } - it.remove(); - dfChanging.put(dfInfo, state); - deletes.add(dfInfo); - } else { - throw new AssertionError("Impossible state"); - } - } - } - if (!newOps.isEmpty()) { - deferredDfOpsQueue.putAll(newOps); - logger.debug("Adding {} operations to be scheduled next time round", newOps.size()); - } - if (!deferredDfOpsQueue.isEmpty()) { - processOpsTime = 0L; - } - if (!writes.isEmpty()) { - logger.debug("Launch thread to process " + writes.size() + " writes"); - Thread w = new Thread(new DfWriter(writes, propertyHandler, FiniteStateMachine.this, writeLocks.values())); - w.start(); - } - if (!archives.isEmpty()) { - logger.debug("Launch thread to process " + archives.size() + " archives"); - Thread w = new Thread(new DfArchiver(archives, propertyHandler, FiniteStateMachine.this, archiveLocks.values())); - w.start(); - } - if (!restores.isEmpty()) { - logger.debug("Launch thread to process " + restores.size() + " restores"); - Thread w = new Thread(new DfRestorer(restores, propertyHandler, FiniteStateMachine.this, restoreLocks.values())); - w.start(); - } - if (!deletes.isEmpty()) { - logger.debug("Launch thread to process " + deletes.size() + " deletes"); - Thread w = new Thread(new DfDeleter(deletes, propertyHandler, FiniteStateMachine.this, deleteLocks.values())); - w.start(); - } - } - } - } finally { - timer.schedule(new DfProcessQueue(), processQueueIntervalMillis); - } - - } - - } - - private class DsProcessQueue extends TimerTask { - - @Override - public void run() { - try { - synchronized (deferredDsOpsQueue) { - final long now = System.currentTimeMillis(); - Map newOps = new HashMap<>(); - final Iterator> it = deferredDsOpsQueue.entrySet().iterator(); - while (it.hasNext()) { - final Entry opEntry = it.next(); - final DsInfo dsInfo = opEntry.getKey(); - if (!dsChanging.containsKey(dsInfo)) { - final RequestedState state = opEntry.getValue(); - if (state == RequestedState.WRITE_REQUESTED - || state == RequestedState.WRITE_THEN_ARCHIVE_REQUESTED) { - if (now > writeTimes.get(dsInfo)) { - try { - Lock lock = lockManager.lock(dsInfo, LockType.SHARED); - logger.debug("Will process " + dsInfo + " with " + state); - writeTimes.remove(dsInfo); - dsChanging.put(dsInfo, RequestedState.WRITE_REQUESTED); - it.remove(); - final Thread w = new Thread( - new DsWriter(dsInfo, propertyHandler, FiniteStateMachine.this, reader, lock)); - w.start(); - if (state == RequestedState.WRITE_THEN_ARCHIVE_REQUESTED) { - newOps.put(dsInfo, RequestedState.ARCHIVE_REQUESTED); - } - } catch (AlreadyLockedException e) { - logger.debug("Could not acquire lock on " + dsInfo + ", hold back process with " + state); - } catch (IOException e) { - logger.error("I/O exception " + e.getMessage() + " locking " + dsInfo); - } - } - } else if (state == RequestedState.ARCHIVE_REQUESTED) { - try { - Lock lock = lockManager.lock(dsInfo, LockType.EXCLUSIVE); - it.remove(); - long dsId = dsInfo.getDsId(); - logger.debug("Will process " + dsInfo + " with " + state); - dsChanging.put(dsInfo, state); - final Thread w = new Thread( - new DsArchiver(dsInfo, propertyHandler, FiniteStateMachine.this, lock)); - w.start(); - } catch (AlreadyLockedException e) { - logger.debug("Could not acquire lock on " + dsInfo + ", hold back process with " + state); - } catch (IOException e) { - logger.error("I/O exception " + e.getMessage() + " locking " + dsInfo); - } - } else if (state == RequestedState.RESTORE_REQUESTED) { - try { - Lock lock = lockManager.lock(dsInfo, LockType.EXCLUSIVE); - logger.debug("Will process " + dsInfo + " with " + state); - dsChanging.put(dsInfo, state); - it.remove(); - final Thread w = new Thread( - new DsRestorer(dsInfo, propertyHandler, FiniteStateMachine.this, reader, lock)); - w.start(); - } catch (AlreadyLockedException e) { - logger.debug("Could not acquire lock on " + dsInfo + ", hold back process with " + state); - } catch (IOException e) { - logger.error("I/O exception " + e.getMessage() + " locking " + dsInfo); - } - } - } - } - deferredDsOpsQueue.putAll(newOps); - } - - } finally { - timer.schedule(new DsProcessQueue(), processQueueIntervalMillis); - } - - } - - } - - public enum RequestedState { - ARCHIVE_REQUESTED, DELETE_REQUESTED, RESTORE_REQUESTED, WRITE_REQUESTED, WRITE_THEN_ARCHIVE_REQUESTED - } - - private static Logger logger = LoggerFactory.getLogger(FiniteStateMachine.class); - - /* - * Note that the veriable processOpsDelayMillis is used to either delay all deferred - * datafile operations or to delay dataset writes, depending on the setting of storageUnit. - */ - private long processOpsDelayMillis; - - private Map deferredDfOpsQueue = new HashMap<>(); - - private Map deferredDsOpsQueue = new HashMap<>(); - - private Map dfChanging = new HashMap<>(); - - private Map dsChanging = new HashMap<>(); - - private Path markerDir; - private long processQueueIntervalMillis; - - private PropertyHandler propertyHandler; - @EJB - IcatReader reader; - - @EJB - private LockManager lockManager; - - private StorageUnit storageUnit; - - private Timer timer = new Timer("FSM Timer"); - - private Long processOpsTime; - - private Map writeTimes = new HashMap<>(); - - private Set failures = ConcurrentHashMap.newKeySet(); - - @PreDestroy - private void exit() { - timer.cancel(); - logger.info("Cancelled timer"); - } - - /** - * Find any DfInfo which may be offline - */ - public Set getDfMaybeOffline() { - Map union; - synchronized (deferredDfOpsQueue) { - union = new HashMap<>(dfChanging); - union.putAll(deferredDfOpsQueue); - } - Set result = new HashSet<>(); - for (Entry entry : union.entrySet()) { - if (entry.getValue() != RequestedState.WRITE_REQUESTED) { - result.add(entry.getKey()); - } - } - return result; - } - - /** - * Find any DfInfo which are being restored or are queued for restoration - */ - public Set getDfRestoring() { - Map union; - synchronized (deferredDfOpsQueue) { - union = new HashMap<>(dfChanging); - union.putAll(deferredDfOpsQueue); - } - Set result = new HashSet<>(); - for (Entry entry : union.entrySet()) { - if (entry.getValue() == RequestedState.RESTORE_REQUESTED) { - result.add(entry.getKey()); - } - } - return result; - } - - /** - * Find any DsInfo which may be offline - */ - public Set getDsMaybeOffline() { - Map union; - synchronized (deferredDsOpsQueue) { - union = new HashMap<>(dsChanging); - union.putAll(deferredDsOpsQueue); - } - Set result = new HashSet<>(); - for (Entry entry : union.entrySet()) { - if (entry.getValue() != RequestedState.WRITE_REQUESTED) { - result.add(entry.getKey()); - } - } - return result; - } - - /** - * Find any DsInfo which are being restored or are queued for restoration - */ - public Set getDsRestoring() { - Map union; - synchronized (deferredDsOpsQueue) { - union = new HashMap<>(dsChanging); - union.putAll(deferredDsOpsQueue); - } - Set result = new HashSet<>(); - for (Entry entry : union.entrySet()) { - if (entry.getValue() == RequestedState.RESTORE_REQUESTED) { - result.add(entry.getKey()); - } - } - return result; - } - - public String getServiceStatus() throws InternalException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (JsonGenerator gen = Json.createGenerator(baos).writeStartObject()) { - if (storageUnit == null) { - gen.writeStartArray("opsQueue").writeEnd(); - } else if (storageUnit == StorageUnit.DATASET) { - Map union; - synchronized (deferredDsOpsQueue) { - union = new HashMap<>(dsChanging); - union.putAll(deferredDsOpsQueue); - } - gen.writeStartArray("opsQueue"); - for (Entry entry : union.entrySet()) { - DsInfo item = entry.getKey(); - gen.writeStartObject().write("data", item.toString()).write("request", entry.getValue().name()) - .writeEnd(); - } - gen.writeEnd(); // end Array("opsQueue") - } else if (storageUnit == StorageUnit.DATAFILE) { - Map union; - synchronized (deferredDfOpsQueue) { - union = new HashMap<>(dfChanging); - union.putAll(deferredDfOpsQueue); - } - gen.writeStartArray("opsQueue"); - for (Entry entry : union.entrySet()) { - DfInfo item = entry.getKey(); - gen.writeStartObject().write("data", item.toString()).write("request", entry.getValue().name()) - .writeEnd(); - } - gen.writeEnd(); // end Array("opsQueue") - } - - Collection lockInfo = lockManager.getLockInfo(); - gen.write("lockCount", lockInfo.size()); - gen.writeStartArray("locks"); - for (LockInfo li : lockInfo) { - gen.writeStartObject().write("id", li.id).write("type", li.type.name()).write("count", li.count).writeEnd(); - } - gen.writeEnd(); // end Array("locks") - - gen.writeStartArray("failures"); - for (Long failure : failures) { - gen.write(failure); - } - gen.writeEnd(); // end Array("failures") - - gen.writeEnd(); // end Object() - } - return baos.toString(); - } - - @PostConstruct - private void init() { - try { - propertyHandler = PropertyHandler.getInstance(); - processQueueIntervalMillis = propertyHandler.getProcessQueueIntervalSeconds() * 1000L; - storageUnit = propertyHandler.getStorageUnit(); - if (storageUnit == StorageUnit.DATASET) { - processOpsDelayMillis = propertyHandler.getDelayDatasetWrites() * 1000L; - timer.schedule(new DsProcessQueue(), processQueueIntervalMillis); - logger.info("DsProcessQueue scheduled to run in " + processQueueIntervalMillis + " milliseconds"); - } else if (storageUnit == StorageUnit.DATAFILE) { - processOpsDelayMillis = propertyHandler.getDelayDatafileOperations() * 1000L; - timer.schedule(new DfProcessQueue(), processQueueIntervalMillis); - logger.info("DfProcessQueue scheduled to run in " + processQueueIntervalMillis + " milliseconds"); - } - markerDir = propertyHandler.getCacheDir().resolve("marker"); - Files.createDirectories(markerDir); - } catch (IOException e) { - throw new RuntimeException("FiniteStateMachine reports " + e.getClass() + " " + e.getMessage()); - } - } - - public void queue(DfInfoImpl dfInfo, DeferredOp deferredOp) throws InternalException { - logger.info("Requesting " + deferredOp + " of datafile " + dfInfo); - - synchronized (deferredDfOpsQueue) { - - if (processOpsTime == null) { - processOpsTime = System.currentTimeMillis() + processOpsDelayMillis; - final Date d = new Date(processOpsTime); - logger.debug("Requesting delay operations till " + d); - } - - final RequestedState state = this.deferredDfOpsQueue.get(dfInfo); - if (state == null) { - if (deferredOp == DeferredOp.WRITE) { - try { - Path marker = markerDir.resolve(Long.toString(dfInfo.getDfId())); - Files.createFile(marker); - logger.debug("Created marker " + marker); - } catch (FileAlreadyExistsException e) { - // Pass will ignore this - } catch (IOException e) { - throw new InternalException(e.getClass() + " " + e.getMessage()); - } - deferredDfOpsQueue.put(dfInfo, RequestedState.WRITE_REQUESTED); - } else if (deferredOp == DeferredOp.ARCHIVE) { - deferredDfOpsQueue.put(dfInfo, RequestedState.ARCHIVE_REQUESTED); - } else if (deferredOp == DeferredOp.RESTORE) { - deferredDfOpsQueue.put(dfInfo, RequestedState.RESTORE_REQUESTED); - } else if (deferredOp == DeferredOp.DELETE) { - deferredDfOpsQueue.put(dfInfo, RequestedState.DELETE_REQUESTED); - } - } else if (state == RequestedState.ARCHIVE_REQUESTED) { - if (deferredOp == DeferredOp.RESTORE) { - deferredDfOpsQueue.remove(dfInfo); - } else if (deferredOp == DeferredOp.DELETE) { - deferredDfOpsQueue.put(dfInfo, RequestedState.DELETE_REQUESTED); - } - } else if (state == RequestedState.DELETE_REQUESTED) { - // No way out - } else if (state == RequestedState.RESTORE_REQUESTED) { - if (deferredOp == DeferredOp.DELETE) { - deferredDfOpsQueue.put(dfInfo, RequestedState.DELETE_REQUESTED); - } else if (deferredOp == DeferredOp.ARCHIVE) { - deferredDfOpsQueue.put(dfInfo, RequestedState.ARCHIVE_REQUESTED); - } - } else if (state == RequestedState.WRITE_REQUESTED) { - if (deferredOp == DeferredOp.DELETE) { - deferredDfOpsQueue.remove(dfInfo); - } else if (deferredOp == DeferredOp.ARCHIVE) { - deferredDfOpsQueue.put(dfInfo, RequestedState.WRITE_THEN_ARCHIVE_REQUESTED); - } - } else if (state == RequestedState.WRITE_THEN_ARCHIVE_REQUESTED) { - if (deferredOp == DeferredOp.DELETE) { - deferredDfOpsQueue.remove(dfInfo); - } else if (deferredOp == DeferredOp.RESTORE) { - deferredDfOpsQueue.put(dfInfo, RequestedState.WRITE_REQUESTED); - } - } - } - } - - public void queue(DsInfo dsInfo, DeferredOp deferredOp) throws InternalException { - logger.info("Requesting " + deferredOp + " of dataset " + dsInfo); - - synchronized (deferredDsOpsQueue) { - - final RequestedState state = this.deferredDsOpsQueue.get(dsInfo); - if (state == null) { - if (deferredOp == DeferredOp.WRITE) { - requestWrite(dsInfo); - } else if (deferredOp == DeferredOp.ARCHIVE) { - deferredDsOpsQueue.put(dsInfo, RequestedState.ARCHIVE_REQUESTED); - } else if (deferredOp == DeferredOp.RESTORE) { - deferredDsOpsQueue.put(dsInfo, RequestedState.RESTORE_REQUESTED); - } - } else if (state == RequestedState.ARCHIVE_REQUESTED) { - if (deferredOp == DeferredOp.WRITE) { - requestWrite(dsInfo); - deferredDsOpsQueue.put(dsInfo, RequestedState.WRITE_THEN_ARCHIVE_REQUESTED); - } else if (deferredOp == DeferredOp.RESTORE) { - deferredDsOpsQueue.put(dsInfo, RequestedState.RESTORE_REQUESTED); - } - } else if (state == RequestedState.RESTORE_REQUESTED) { - if (deferredOp == DeferredOp.WRITE) { - requestWrite(dsInfo); - } else if (deferredOp == DeferredOp.ARCHIVE) { - deferredDsOpsQueue.put(dsInfo, RequestedState.ARCHIVE_REQUESTED); - } - } else if (state == RequestedState.WRITE_REQUESTED) { - if (deferredOp == DeferredOp.WRITE) { - setDelay(dsInfo); - } else if (deferredOp == DeferredOp.ARCHIVE) { - deferredDsOpsQueue.put(dsInfo, RequestedState.WRITE_THEN_ARCHIVE_REQUESTED); - } - } else if (state == RequestedState.WRITE_THEN_ARCHIVE_REQUESTED) { - if (deferredOp == DeferredOp.WRITE) { - setDelay(dsInfo); - } else if (deferredOp == DeferredOp.RESTORE) { - deferredDsOpsQueue.put(dsInfo, RequestedState.WRITE_REQUESTED); - } - } - } - - } - - public void removeFromChanging(DfInfo dfInfo) { - synchronized (deferredDfOpsQueue) { - dfChanging.remove(dfInfo); - } - } - - public void removeFromChanging(DsInfo dsInfo) { - synchronized (deferredDsOpsQueue) { - dsChanging.remove(dsInfo); - } - } - - private void requestWrite(DsInfo dsInfo) throws InternalException { - try { - Path marker = markerDir.resolve(Long.toString(dsInfo.getDsId())); - Files.createFile(marker); - logger.debug("Created marker " + marker); - } catch (FileAlreadyExistsException e) { - // Pass will ignore this - } catch (IOException e) { - throw new InternalException(e.getClass() + " " + e.getMessage()); - } - deferredDsOpsQueue.put(dsInfo, RequestedState.WRITE_REQUESTED); - setDelay(dsInfo); - } - - private void setDelay(DsInfo dsInfo) { - writeTimes.put(dsInfo, System.currentTimeMillis() + processOpsDelayMillis); - if (logger.isDebugEnabled()) { - final Date d = new Date(writeTimes.get(dsInfo)); - logger.debug("Requesting delay of writing of dataset " + dsInfo + " till " + d); - } - } - - public void recordSuccess(Long id) { - if (failures.remove(id)) { - logger.debug("Marking {} OK", id); - } - } - - public void recordFailure(Long id) { - if (failures.add(id)) { - logger.debug("Marking {} as failure", id); - } - } - - public void checkFailure(Long id) throws InternalException { - if (failures.contains(id)) { - throw new InternalException("Restore failed"); - } - } - -} diff --git a/src/main/java/org/icatproject/ids/IdsBean.java b/src/main/java/org/icatproject/ids/IdsBean.java deleted file mode 100644 index a6cf2c25..00000000 --- a/src/main/java/org/icatproject/ids/IdsBean.java +++ /dev/null @@ -1,2215 +0,0 @@ -package org.icatproject.ids; - -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Path; -import java.security.NoSuchAlgorithmException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.OptionalLong; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; -import java.util.zip.CRC32; -import java.util.zip.ZipEntry; -import java.util.zip.ZipException; -import java.util.zip.ZipOutputStream; -import javax.xml.datatype.DatatypeFactory; - -import jakarta.annotation.PostConstruct; -import jakarta.ejb.EJB; -import jakarta.ejb.Stateless; -import jakarta.json.Json; -import jakarta.json.JsonNumber; -import jakarta.json.JsonObject; -import jakarta.json.JsonReader; -import jakarta.json.JsonValue; -import jakarta.json.stream.JsonGenerator; -import static jakarta.ws.rs.core.HttpHeaders.CONTENT_LENGTH; -import jakarta.ws.rs.core.Response; -import jakarta.ws.rs.core.StreamingOutput; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.icatproject.Datafile; -import org.icatproject.DatafileFormat; -import org.icatproject.Dataset; -import org.icatproject.EntityBaseBean; -import org.icatproject.ICAT; -import org.icatproject.IcatExceptionType; -import org.icatproject.IcatException_Exception; -import org.icatproject.ids.DataSelection.Returns; -import org.icatproject.ids.LockManager.Lock; -import org.icatproject.ids.LockManager.LockType; -import org.icatproject.ids.exceptions.BadRequestException; -import org.icatproject.ids.exceptions.DataNotOnlineException; -import org.icatproject.ids.exceptions.IdsException; -import org.icatproject.ids.exceptions.InsufficientPrivilegesException; -import org.icatproject.ids.exceptions.InternalException; -import org.icatproject.ids.exceptions.NotFoundException; -import org.icatproject.ids.exceptions.NotImplementedException; -import org.icatproject.ids.plugin.AlreadyLockedException; -import org.icatproject.ids.plugin.ArchiveStorageInterface; -import org.icatproject.ids.plugin.DfInfo; -import org.icatproject.ids.plugin.DsInfo; -import org.icatproject.ids.plugin.MainStorageInterface; -import org.icatproject.ids.plugin.ZipMapperInterface; -import org.icatproject.utils.IcatSecurity; -import org.icatproject.utils.ShellCommand; - -@Stateless -public class IdsBean { - - public class RunPrepDsCheck implements Callable { - - private Collection toCheck; - private Set emptyDatasets; - - public RunPrepDsCheck(Collection toCheck, Set emptyDatasets) { - this.toCheck = toCheck; - this.emptyDatasets = emptyDatasets; - } - - @Override - public Void call() throws Exception { - for (DsInfo dsInfo : toCheck) { - fsm.checkFailure(dsInfo.getDsId()); - restoreIfOffline(dsInfo, emptyDatasets); - } - return null; - } - - } - - public class RunPrepDfCheck implements Callable { - - private SortedSet toCheck; - - public RunPrepDfCheck(SortedSet toCheck) { - this.toCheck = toCheck; - } - - @Override - public Void call() throws Exception { - for (DfInfoImpl dfInfo : toCheck) { - fsm.checkFailure(dfInfo.getDfId()); - restoreIfOffline(dfInfo); - } - return null; - } - - } - - enum CallType { - INFO, PREPARE, READ, WRITE, MIGRATE - } - - public class RestoreDfTask implements Callable { - - private Set dfInfos; - - public RestoreDfTask(Set dfInfos) { - this.dfInfos = dfInfos; - } - - @Override - public Void call() throws Exception { - for (DfInfoImpl dfInfo : dfInfos) { - restoreIfOffline(dfInfo); - } - return null; - } - - } - - public class RestoreDsTask implements Callable { - private Collection dsInfos; - private Set emptyDs; - - public RestoreDsTask(Collection dsInfos, Set emptyDs) { - this.dsInfos = dsInfos; - this.emptyDs = emptyDs; - } - - @Override - public Void call() throws Exception { - for (DsInfo dsInfo : dsInfos) { - restoreIfOffline(dsInfo, emptyDs); - } - return null; - } - } - - private class SO implements StreamingOutput { - - private long offset; - private boolean zip; - private Map dsInfos; - private Lock lock; - private boolean compress; - private Set dfInfos; - private String ip; - private long start; - private Long transferId; - - SO(Map dsInfos, Set dfInfos, long offset, boolean zip, boolean compress, - Lock lock, Long transferId, String ip, long start) { - this.offset = offset; - this.zip = zip; - this.dsInfos = dsInfos; - this.dfInfos = dfInfos; - this.lock = lock; - this.compress = compress; - this.transferId = transferId; - this.ip = ip; - this.start = start; - } - - @Override - public void write(OutputStream output) throws IOException { - Object transfer = "??"; - try { - if (offset != 0) { // Wrap the stream if needed - output = new RangeOutputStream(output, offset, null); - } - byte[] bytes = new byte[BUFSIZ]; - if (zip) { - ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(output)); - if (!compress) { - zos.setLevel(0); // Otherwise use default compression - } - - for (DfInfoImpl dfInfo : dfInfos) { - logger.debug("Adding " + dfInfo + " to zip"); - transfer = dfInfo; - DsInfo dsInfo = dsInfos.get(dfInfo.getDsId()); - String entryName = zipMapper.getFullEntryName(dsInfo, dfInfo); - InputStream stream = null; - try { - zos.putNextEntry(new ZipEntry(entryName)); - stream = mainStorage.get(dfInfo.getDfLocation(), dfInfo.getCreateId(), dfInfo.getModId()); - int length; - while ((length = stream.read(bytes)) >= 0) { - zos.write(bytes, 0, length); - } - } catch (ZipException e) { - logger.debug("Skipped duplicate"); - } - zos.closeEntry(); - if (stream != null) { - stream.close(); - } - } - zos.close(); - } else { - DfInfoImpl dfInfo = dfInfos.iterator().next(); - transfer = dfInfo; - InputStream stream = mainStorage.get(dfInfo.getDfLocation(), dfInfo.getCreateId(), - dfInfo.getModId()); - int length; - while ((length = stream.read(bytes)) >= 0) { - output.write(bytes, 0, length); - } - output.close(); - stream.close(); - } - - if (transferId != null) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (JsonGenerator gen = Json.createGenerator(baos).writeStartObject()) { - gen.write("transferId", transferId); - gen.writeEnd(); - } - transmitter.processMessage("getData", ip, baos.toString(), start); - } - - } catch (IOException e) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (JsonGenerator gen = Json.createGenerator(baos).writeStartObject()) { - gen.write("transferId", transferId); - gen.write("exceptionClass", e.getClass().toString()); - gen.write("exceptionMessage", e.getMessage()); - gen.writeEnd(); - } - transmitter.processMessage("getData", ip, baos.toString(), start); - logger.error("Failed to stream " + transfer + " due to " + e.getMessage()); - throw e; - } finally { - lock.release(); - } - } - - } - - private static final int BUFSIZ = 2048; - - private static Boolean inited = false; - - private static String key; - - private final static Logger logger = LoggerFactory.getLogger(IdsBean.class); - private static String paddedPrefix; - private static final String prefix = ""; - - /** - * matches standard UUID format of 8-4-4-4-12 hexadecimal digits - */ - public static final Pattern uuidRegExp = Pattern - .compile("^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$"); - - static { - paddedPrefix = ""; + + static { + paddedPrefix = "