diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..79feeb7 --- /dev/null +++ b/pom.xml @@ -0,0 +1,57 @@ + + 4.0.0 + + ru.spbau.aastarkova + homeworks + 1.0-SNAPSHOT + + + + junit + junit + 4.12 + test + + + com.google.guava + guava + 19.0 + + + commons-io + commons-io + 2.4 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + -Xlint:all + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + -Xlint:all + + + + + + diff --git a/src/main/java/ru/spbau/mit/Client.java b/src/main/java/ru/spbau/mit/Client.java new file mode 100644 index 0000000..bbf2594 --- /dev/null +++ b/src/main/java/ru/spbau/mit/Client.java @@ -0,0 +1,58 @@ +package ru.spbau.mit; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +public class Client extends ClientBase { + public Client(ClientInfo info) { + super(info); + } + + public List list() throws IOException { + try (TrackerConnection connection = connectToTracker()) { + connection.sendListRequest(); + return connection.readListResponse(); + } + } + + public boolean get(int id) throws Exception { + try (MyLock myLock = MyLock.lock(info.lock.readLock())) { + if (info.files.containsKey(id)) { + return false; + } + } + FileDescriptor serverDescriptor = list().stream() + .filter(descriptor -> descriptor.getFileId() == id) + .findAny().orElse(null); + if (serverDescriptor == null) { + return false; + } + try (MyLock myLock = MyLock.lock(info.lock.writeLock())) { + info.files.put(id, new ClientInfo.FileInfo(serverDescriptor, null, info.workingDirectory)); + } + return true; + } + + public FileDescriptor newFile(Path path) throws IOException { + if (!Files.isRegularFile(path)) { + throw new IllegalArgumentException("File not exists or is not a regular file."); + } + + FileDescriptor newDescriptor = new FileDescriptor(Files.size(path), path.getFileName().toString()); + try (TrackerConnection connection = connectToTracker()) { + connection.sendUploadRequest(newDescriptor); + int newId = connection.readUploadResponse(); + newDescriptor = newDescriptor.setId(newId); + } + ClientInfo.FileInfo newInfo = new ClientInfo.FileInfo(newDescriptor, path, null); + try (MyLock myLock = MyLock.lock(info.lock.writeLock())) { + info.files.put(newDescriptor.getFileId(), newInfo); + } catch (Exception e) { + e.printStackTrace(); + } + return newDescriptor; + } + +} diff --git a/src/main/java/ru/spbau/mit/ClientBase.java b/src/main/java/ru/spbau/mit/ClientBase.java new file mode 100644 index 0000000..5cbe0c4 --- /dev/null +++ b/src/main/java/ru/spbau/mit/ClientBase.java @@ -0,0 +1,16 @@ +package ru.spbau.mit; + +import java.io.IOException; +import java.net.Socket; + +public abstract class ClientBase { + protected ClientInfo info; + + public ClientBase(ClientInfo info) { + this.info = info; + } + + protected TrackerConnection connectToTracker() throws IOException { + return new TrackerConnection(new Socket(info.host, TrackerConnection.PORT)); + } +} diff --git a/src/main/java/ru/spbau/mit/ClientDescriptor.java b/src/main/java/ru/spbau/mit/ClientDescriptor.java new file mode 100644 index 0000000..314a237 --- /dev/null +++ b/src/main/java/ru/spbau/mit/ClientDescriptor.java @@ -0,0 +1,45 @@ +package ru.spbau.mit; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by Анастасия on 10.04.2016. + */ +public final class ClientDescriptor { + private final InetSocketAddress address; + private final List idList; + + public ClientDescriptor(InetSocketAddress address, List idList) { + this.address = address; + this.idList = idList; + } + + public InetSocketAddress getAddress() { + return address; + } + + public List getIdList() { + return idList; + } + + public void writeInfoToOutputStream(DataOutputStream outputStream) throws IOException { + //Write address to output stream + ReadWriteHelper.writeAddress(outputStream, address); + //Write idList to output stream + ReadWriteHelper.writeCollection(outputStream, idList, DataOutputStream::writeInt); + } + + public static ClientDescriptor readInfoFromInputStream(DataInputStream inputStream) throws IOException { + return new ClientDescriptor( + //Read address from input stream + ReadWriteHelper.readAddress(inputStream), + //Read idList from input stream + ReadWriteHelper.readCollection(inputStream, new ArrayList<>(), DataInputStream::readInt)); + } + +} diff --git a/src/main/java/ru/spbau/mit/ClientInfo.java b/src/main/java/ru/spbau/mit/ClientInfo.java new file mode 100644 index 0000000..d3eb145 --- /dev/null +++ b/src/main/java/ru/spbau/mit/ClientInfo.java @@ -0,0 +1,130 @@ +package ru.spbau.mit; + +import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class ClientInfo implements AutoCloseable { + private static final String INFO_FILE = "client-info.dat"; + private static final String DOWNLOADS_DIR = "downloads"; + + Path workingDirectory; + ReadWriteLock lock = new ReentrantReadWriteLock(); + Map files; + String host; + + public ClientInfo(String host, Path workingDirectory) throws IOException { + this(workingDirectory); + this.host = host; + } + + public ClientInfo(Path workingDirectory) throws IOException { + this.workingDirectory = workingDirectory; + load(); + } + + public static void removeInfo(Path workingDirectory) { + Path info = workingDirectory.resolve(INFO_FILE); + if (Files.exists(info)) { + try { + Files.delete(info); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @Override + public void close() throws Exception { + store(); + } + + static final class FileInfo { + ReadWriteLock fileLock = new ReentrantReadWriteLock(); + FileDescriptor descriptor; + PartsBitset parts; + Path localPath; + + FileInfo(FileDescriptor descriptor, Path localPath, Path workingDirectory) throws IOException { + this(descriptor, new PartsBitset(descriptor.getNumberOfTheParts(), localPath != null), + localPath, workingDirectory); + } + + FileInfo(FileDescriptor descriptor, PartsBitset parts, Path localPath, + Path workingDirectory) throws IOException { + this.descriptor = descriptor; + this.parts = parts; + if (localPath == null) { + this.localPath = workingDirectory.resolve(Paths.get( + DOWNLOADS_DIR, + Integer.toString(descriptor.getFileId()), + descriptor.getFileName() + )); + Files.createDirectories(this.localPath.getParent()); + try (RandomAccessFile file = new RandomAccessFile(this.localPath.toString(), "rw")) { + file.setLength(descriptor.getFileSize()); + } + } else { + this.localPath = localPath; + } + } + + private void writeDataToOutputStream(DataOutputStream outputStream) throws IOException { + descriptor.writeInfoToOutputStream(outputStream); + parts.writeDataToOutputStream(outputStream); + outputStream.writeUTF(localPath.toString()); + } + + private static FileInfo readDataFromInputStream(DataInputStream inputStream) throws IOException { + FileDescriptor fileDescriptor = FileDescriptor.readInfoFromInputStream(inputStream, true); + PartsBitset parts = PartsBitset.readDataFromInputStream(inputStream, fileDescriptor.getNumberOfTheParts()); + String localPath = inputStream.readUTF(); + return new FileInfo(fileDescriptor, parts, Paths.get(localPath), null); + } + } + + private void store() throws IOException { + Path info = workingDirectory.resolve(INFO_FILE); + if (!Files.exists(info)) { + Files.createDirectories(workingDirectory); + Files.createFile(info); + } + try (DataOutputStream outputStream = new DataOutputStream(Files.newOutputStream(info))) { + outputStream.writeUTF(host); + ReadWriteHelper.writeCollection(outputStream, files.values(), + (outputStream1, w) -> w.writeDataToOutputStream(outputStream1)); + } + } + + private void load() throws IOException { + Path info = workingDirectory.resolve(INFO_FILE); + if (Files.exists(info)) { + try (DataInputStream inputStream = new DataInputStream(Files.newInputStream(info))) { + host = inputStream.readUTF(); + files = ReadWriteHelper.readCollection(inputStream, new HashSet<>(), FileInfo::readDataFromInputStream) + .stream() + .collect(Collectors.toMap( + fileInfo -> fileInfo.descriptor.getFileId(), + Function.identity() + + )); + } + } else { + host = ""; + files = new HashMap<>(); + } + } +} diff --git a/src/main/java/ru/spbau/mit/Connection.java b/src/main/java/ru/spbau/mit/Connection.java new file mode 100644 index 0000000..f4bdfb3 --- /dev/null +++ b/src/main/java/ru/spbau/mit/Connection.java @@ -0,0 +1,67 @@ +package ru.spbau.mit; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Collection; + +/** + * Created by Анастасия on 10.04.2016. + */ +public abstract class Connection implements AutoCloseable { + private Socket socket; + private DataInputStream inputStream; + private DataOutputStream outputStream; + + //Constructor for our abstract class + protected Connection(Socket socket) throws IOException { + this.socket = socket; + //Creating new input and output streams using socket + inputStream = new DataInputStream(socket.getInputStream()); + outputStream = new DataOutputStream(socket.getOutputStream()); + } + + @Override + public void close() { + try { + socket.close(); + } catch (IOException ignoredException) { + } + } + + public DataOutputStream getOutputStream() { + return outputStream; + } + + public DataInputStream getInputStream() { + return inputStream; + } + + //We need this function in Tracker class in update() method + public String getHost() { + return ((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(); + } + + /* + * writeCollection function for the connection, using ReadWriteHelper writeConnection function + * we need it in methods of class TrackerConnection + */ + protected void writeCollection(Collection collection, + ReadWriteHelper.Writer writer) throws IOException { + ReadWriteHelper.writeCollection(outputStream, collection, writer); + } + + //readCollection function for the connection, using ReadWriteHelper readConnection function + protected > R readCollection(R collection, + ReadWriteHelper.Reader reader) + throws IOException { + return ReadWriteHelper.readCollection(inputStream, collection, reader); + } + + //We need this function in listenConnection function in class Tracker + public int readRequest() throws IOException { + return inputStream.readUnsignedByte(); + } +} diff --git a/src/main/java/ru/spbau/mit/FileDescriptor.java b/src/main/java/ru/spbau/mit/FileDescriptor.java new file mode 100644 index 0000000..e6c7739 --- /dev/null +++ b/src/main/java/ru/spbau/mit/FileDescriptor.java @@ -0,0 +1,112 @@ +package ru.spbau.mit; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Objects; + +/** + * Created by Анастасия on 10.04.2016. + */ +public final class FileDescriptor { + public static final int PART_SIZE = 1024 * 1024 * 10; //Size of one part of the file - 10M + + private final boolean hasFileGotId; + private final int fileId; + private final long fileSize; + private final String fileName; + + //Constructor for our class with id + public FileDescriptor(int id, long size, String name) { + fileId = id; + hasFileGotId = true; + fileSize = size; + fileName = name; + } + + //Constructor for our class without id + public FileDescriptor(long size, String name) { + hasFileGotId = false; + fileId = 0; + fileSize = size; + fileName = name; + } + + public boolean hasFileGotId() { + return hasFileGotId; + } + + public int getFileId() { + return fileId; + } + + public long getFileSize() { + return fileSize; + } + + public String getFileName() { + return fileName; + } + + public FileDescriptor setId(int id) { + return new FileDescriptor(id, fileSize, fileName); + } + + public int getNumberOfTheParts() { + /* + * We need to add PART_SIZE to fileSize because our last part can be less than 10M + * and we need to subtract 1 because our last part can be 10M + */ + return (int) (fileSize + PART_SIZE - 1) / PART_SIZE; + } + + public int getPartSize(int partId) { + /* + * If we want to get size of the any part except last part, we return 10M + * or if PART_SIZE is divisor of fileSize, we also return 10M + */ + if ((partId < getNumberOfTheParts() - 1) || (fileSize % PART_SIZE == 0)) { + return PART_SIZE; + } + return (int) fileSize % PART_SIZE; + } + + public void writeInfoToOutputStream(DataOutputStream outputStream) throws IOException { + //If the file has id, we'll write it + if (hasFileGotId) { + outputStream.writeInt(fileId); + } + //Write size of file and it's name + outputStream.writeLong(fileSize); + outputStream.writeUTF(fileName); + } + + public static FileDescriptor readInfoFromInputStream(DataInputStream inputStream, boolean hasFileGotId) + throws IOException { + //We create FileDescriptor object in two different ways, depends on has our file got id or not + if (hasFileGotId) { + int fileId = inputStream.readInt(); + long fileSize = inputStream.readLong(); + String fileName = inputStream.readUTF(); + return new FileDescriptor(fileId, fileSize, fileName); + } else { + long fileSize = inputStream.readLong(); + String fileName = inputStream.readUTF(); + return new FileDescriptor(fileSize, fileName); + } + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof FileDescriptor)) { + return false; + } + FileDescriptor that = (FileDescriptor) obj; + return this.hasFileGotId == that.hasFileGotId + && this.fileId == that.fileId + && Objects.equals(this.fileName, that.fileName) + && this.fileSize == that.fileSize; + } + + +} diff --git a/src/main/java/ru/spbau/mit/MyLock.java b/src/main/java/ru/spbau/mit/MyLock.java new file mode 100644 index 0000000..690c371 --- /dev/null +++ b/src/main/java/ru/spbau/mit/MyLock.java @@ -0,0 +1,42 @@ +package ru.spbau.mit; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.locks.Lock; + +/** + * Created by Анастасия on 11.04.2016. + */ + +//Wrapper over class Lock +public final class MyLock implements AutoCloseable { + private Lock lock; + private static final Queue locks = new ArrayDeque<>(); + + private MyLock() {}; + + //Create our own function lock returns MyLock + public static MyLock lock(Lock lock) { + MyLock res; + synchronized (locks) { + res = locks.poll(); //Set the head of the locks queue as a result (or null if the queue is empty) + } + + //If the result is null, we'll create new MyLock + if (res == null) { + res = new MyLock(); + } + res.lock = lock; + //Acquires the lock + lock.lock(); + return res; + } + + @Override + public void close() throws Exception { + lock.unlock(); + synchronized (locks) { + locks.add(this); //When we release the lock, we add current MyLock object to the locks queue + } + } +} diff --git a/src/main/java/ru/spbau/mit/PartsBitset.java b/src/main/java/ru/spbau/mit/PartsBitset.java new file mode 100644 index 0000000..283396c --- /dev/null +++ b/src/main/java/ru/spbau/mit/PartsBitset.java @@ -0,0 +1,79 @@ +package ru.spbau.mit; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; + +/** + * Created by Анастасия on 11.04.2016. + */ +public class PartsBitset { + private int cnt = 0; + private boolean[] flags; + + public PartsBitset(int size, boolean defaultValue) { + flags = new boolean[size]; + if (defaultValue) { + Arrays.fill(flags, true); + cnt = size; + } + } + + public boolean get(int pos) { + return flags[pos]; + } + + public void set(int pos, boolean value) { + if (flags[pos] == value) { + return; + } + flags[pos] = !flags[pos]; + if (flags[pos]) { + cnt++; + } else { + cnt--; + } + } + + public int getCount() { + return cnt; + } + + public void writeDataToOutputStream(DataOutputStream outputStream) throws IOException { + outputStream.writeInt(cnt); + for (int i = 0; i != flags.length; i++) { + if (get(i)) { + outputStream.writeInt(i); + } + } + } + + public static PartsBitset readDataFromInputStream(DataInputStream inputStream, int size) throws IOException { + PartsBitset result = new PartsBitset(size, false); + int count = inputStream.readInt(); + while (count > 0) { + count--; + result.set(inputStream.readInt(), true); + } + return result; + } + + public void subtract(PartsBitset other) { + assert (other.flags.length == flags.length); + for (int i = 0; i != flags.length; i++) { + if (other.get(i)) { + set(i, false); + } + } + } + + public int getFirstBitAtLeast(int pos) { + for (int i = pos; i != flags.length; i++) { + if (get(i)) { + return i; + } + } + return -1; + } +} diff --git a/src/main/java/ru/spbau/mit/PeerToPeerConnection.java b/src/main/java/ru/spbau/mit/PeerToPeerConnection.java new file mode 100644 index 0000000..31d8a8a --- /dev/null +++ b/src/main/java/ru/spbau/mit/PeerToPeerConnection.java @@ -0,0 +1,87 @@ +package ru.spbau.mit; + +import java.io.*; +import java.net.Socket; + +/** + * Created by Анастасия on 11.04.2016. + */ +public class PeerToPeerConnection extends Connection { + public static final int STAT_REQUEST = 1; + public static final int GET_REQUEST = 2; + private static final int BUFFER_SIZE = 4096; + + protected PeerToPeerConnection(Socket socket) throws IOException { + super(socket); + } + + //Methods related to stat() function + + public void sendStatRequest(int fileId) throws IOException { + DataOutputStream outputStream = getOutputStream(); + outputStream.writeByte(STAT_REQUEST); + outputStream.writeInt(fileId); + outputStream.flush(); + } + + public int readStatRequest() throws IOException { + return getInputStream().readInt(); + } + + public void writeStatResponse(PartsBitset parts) throws IOException { + DataOutputStream outputStream = getOutputStream(); + parts.writeDataToOutputStream(outputStream); + outputStream.flush(); + } + + public PartsBitset readStatResponse(int size) throws IOException { + return PartsBitset.readDataFromInputStream(getInputStream(), size); + } + + //Methods related to get() function + + public void sendGetRequest(Request request) throws IOException { + DataOutputStream outputStream = getOutputStream(); + outputStream.writeByte(GET_REQUEST); + request.writeRequestToOutputStream(outputStream); + outputStream.flush(); + } + + public Request readGetRequest() throws IOException { + return Request.readRequestFromInputStream(getInputStream()); + } + + public void writeGetResponse(RandomAccessFile from, int partId, FileDescriptor descriptor) throws IOException { + from.seek(FileDescriptor.PART_SIZE * partId); + int amount = descriptor.getPartSize(partId); + + DataOutputStream outputStream = getOutputStream(); + byte[] buffer = new byte[BUFFER_SIZE]; + while (amount > 0) { + int read = from.read(buffer, 0, Math.min(amount, BUFFER_SIZE)); + if (read == -1) { + throw new EOFException("File is shorter than recorded size."); + } + amount -= read; + outputStream.write(buffer, 0, read); + } + outputStream.flush(); + } + + public void readGetResponse(RandomAccessFile to, int partId, FileDescriptor descriptor) throws IOException { + to.seek(FileDescriptor.PART_SIZE * partId); + int amount = descriptor.getPartSize(partId); + + DataInputStream dis = getInputStream(); + byte[] buffer = new byte[BUFFER_SIZE]; + while (amount > 0) { + int read = dis.read(buffer, 0, Math.min(amount, BUFFER_SIZE)); + if (read == -1) { + throw new EOFException("Cannot read the end of the file from socket."); + } + amount -= read; + to.write(buffer, 0, read); + } + } + +} diff --git a/src/main/java/ru/spbau/mit/ReadWriteHelper.java b/src/main/java/ru/spbau/mit/ReadWriteHelper.java new file mode 100644 index 0000000..a49d491 --- /dev/null +++ b/src/main/java/ru/spbau/mit/ReadWriteHelper.java @@ -0,0 +1,68 @@ +package ru.spbau.mit; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Writer; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; + +/** + * Created by Анастасия on 10.04.2016. + */ + +/** + * This class contains functions that help us to read and write to/from output/input stream address and collections + */ +public abstract class ReadWriteHelper { + private static final int IP_LENGTH = 4; //We need it to read IP from the DataInputStream while reading address + + public static void writeAddress(DataOutputStream outputStream, InetSocketAddress address) throws IOException { + outputStream.write(address.getAddress().getAddress()); //Write IP address to the output stream + outputStream.writeShort(address.getPort()); //Write port number to the output stream + } + + public static InetSocketAddress readAddress(DataInputStream inputStream) throws IOException { + //Read IP address + byte[] buffer = new byte[IP_LENGTH]; + for (int i = 0; i < IP_LENGTH; i++) { + buffer[i] = inputStream.readByte(); + } + //Read port number + int port = inputStream.readUnsignedShort(); + //Create InetSocketAddress object using InetSocketAddress(InetAddress, int) constructor + return new InetSocketAddress(InetAddress.getByAddress(buffer), port); + } + + //Next interfaces are needed in reading/writing collections with parameter T + public interface Writer { + void write(DataOutputStream outputStream, T value) throws IOException; + } + + public interface Reader { + T read(DataInputStream inputStream) throws IOException; + } + + public static void writeCollection(DataOutputStream outputStream, Collection collection, + Writer writer) throws IOException { + //Write size of collection + outputStream.writeInt(collection.size()); + //Write all elements of collection + for (T element : collection) { + writer.write(outputStream, element); + } + } + + public static > R readCollection(DataInputStream inputStream, R collection, + Reader reader) throws IOException { + //Read size of collection + int collectionSize = inputStream.readInt(); + //Read all elements and add to collection + for (int i = 0; i < collectionSize; i++) { + collection.add(reader.read(inputStream)); + } + return collection; + } + +} diff --git a/src/main/java/ru/spbau/mit/Request.java b/src/main/java/ru/spbau/mit/Request.java new file mode 100644 index 0000000..17ec78a --- /dev/null +++ b/src/main/java/ru/spbau/mit/Request.java @@ -0,0 +1,38 @@ +package ru.spbau.mit; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Created by Анастасия on 10.04.2016. + */ +public class Request { + private int fileId; + private int partId; + + public Request(int fileId, int partId) { + this.fileId = fileId; + this.partId = partId; + } + + public int getFileId() { + return fileId; + } + + public int getPartId() { + return partId; + } + + public void writeRequestToOutputStream(DataOutputStream outputStream) throws IOException { + outputStream.writeInt(fileId); + outputStream.writeInt(partId); + } + + public static Request readRequestFromInputStream(DataInputStream inputStream) throws IOException { + return new Request( + inputStream.readInt(), //Read fileId + inputStream.readInt() //Read partId + ); + } +} diff --git a/src/main/java/ru/spbau/mit/RunningClient.java b/src/main/java/ru/spbau/mit/RunningClient.java new file mode 100644 index 0000000..3618561 --- /dev/null +++ b/src/main/java/ru/spbau/mit/RunningClient.java @@ -0,0 +1,364 @@ +package ru.spbau.mit; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class RunningClient extends ClientBase { + private static final long DELAY = 1000; + + public interface StatusCallbacks { + void onTrackerUpdated(boolean result, Throwable e); + + void onDownloadIssue(FileDescriptor descriptor, String message, Throwable e); + + void onDownloadStart(FileDescriptor descriptor); + + void onDownloadPart(FileDescriptor descriptor, int partId); + + void onDownloadComplete(FileDescriptor descriptor); + + void onPeerToPeerServerIssue(Throwable e); + } + + private volatile StatusCallbacks callbacks = null; + private boolean isRunning = false; + private ServerSocket serverSocket; + private ExecutorService threadPool; + private ScheduledExecutorService scheduler; + + public RunningClient(ClientInfo info) { + super(info); + } + + public void startingRun(StatusCallbacks callbacks) throws Exception { + try { + isRunning = true; + this.callbacks = callbacks; + + threadPool = Executors.newCachedThreadPool(); + scheduler = Executors.newScheduledThreadPool(1); + + // Starting download + try (MyLock myLock = MyLock.lock(info.lock.readLock())) { + for (ClientInfo.FileInfo fileInfo : info.files.values()) { + if (fileInfo.parts.getCount() == fileInfo.descriptor.getNumberOfTheParts()) { + continue; + } + threadPool.submit(() -> { + try { + download(fileInfo); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + } + + // Seeding server starts + serverSocket = new ServerSocket(0); + threadPool.submit(this::server); + + // Tracking update loop starts + scheduler.scheduleAtFixedRate(this::updateTracker, 0, DELAY, TimeUnit.MILLISECONDS); + + Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); + } catch (IOException e) { + threadPool.shutdownNow(); + scheduler.shutdownNow(); + isRunning = false; + throw e; + } + } + + public void shutdown() { + try { + if (!isRunning) { + return; + } + serverSocket.close(); + threadPool.shutdown(); + scheduler.shutdown(); + info.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Protocol requests + + private List sources(Collection files) throws IOException { + try (TrackerConnection connection = connectToTracker()) { + connection.sendSourcesRequest(files); + return connection.readSourcesResponse(); + } + } + + private PartsBitset stat(InetSocketAddress seeder, ClientInfo.FileInfo info) throws IOException { + try (PeerToPeerConnection connection = connectToSeeder(seeder)) { + connection.sendStatRequest(info.descriptor.getFileId()); + return connection.readStatResponse(info.descriptor.getNumberOfTheParts()); + } + } + + private void get(InetSocketAddress seeder, ClientInfo.FileInfo info, int partId) throws IOException { + try (PeerToPeerConnection connection = connectToSeeder(seeder)) { + connection.sendGetRequest(new Request(info.descriptor.getFileId(), partId)); + try (RandomAccessFile file = new RandomAccessFile(info.localPath.toString(), "rw")) { + connection.readGetResponse(file, partId, info.descriptor); + } + } + } + + // Seeding part + + private boolean update(int port) throws Exception { + List availableFiles; + try (MyLock myLock = MyLock.lock(info.lock.readLock())) { + availableFiles = info.files + .values() + .stream() + .filter(fileInfo -> { + try (MyLock myLock1 = MyLock.lock(fileInfo.fileLock.readLock())) { + + } catch (Exception e) { + e.printStackTrace(); + } + return fileInfo.parts.getCount() > 0; + }) + .map(fileInfo -> fileInfo.descriptor.getFileId()) + .collect(Collectors.toList()); + } + ClientDescriptor descriptor = new ClientDescriptor(new InetSocketAddress("", port), availableFiles); + try (TrackerConnection trackerConnection = connectToTracker()) { + trackerConnection.sendUpdateRequest(descriptor); + return trackerConnection.readUpdateResponse(); + } + } + + private void server() { + while (true) { + try { + Socket socket = serverSocket.accept(); + threadPool.submit(() -> listen(socket)); + } catch (IOException e) { + notifyPeerToPeerServerIssue(e); + break; + } + } + } + + private void listen(Socket socket) { + try (PeerToPeerConnection connection = new PeerToPeerConnection(socket)) { + int request = connection.readRequest(); + switch (request) { + case PeerToPeerConnection.STAT_REQUEST: + statRequestFunction(connection); + break; + case PeerToPeerConnection.GET_REQUEST: + getRequestFunction(connection); + break; + default: + throw new IllegalArgumentException( + String.format("Request %d from connection is incorrect!.", request) + ); + } + } catch (Exception e) { + notifyPeerToPeerServerIssue(e); + } + } + + private void statRequestFunction(PeerToPeerConnection connection) throws Exception { + int fileId = connection.readStatRequest(); + ClientInfo.FileInfo fileInfo; + try (MyLock myLock = MyLock.lock(info.lock.readLock())) { + fileInfo = info.files.get(fileId); + } + try (MyLock myLock = MyLock.lock(fileInfo.fileLock.readLock())) { + connection.writeStatResponse(fileInfo.parts); + } + } + + private void getRequestFunction(PeerToPeerConnection connection) throws Exception { + Request request = connection.readGetRequest(); + ClientInfo.FileInfo fileInfo; + try (MyLock myLock = MyLock.lock(info.lock.readLock())) { + fileInfo = info.files.get(request.getFileId()); + } + try (MyLock myLock = MyLock.lock(fileInfo.fileLock.readLock())) { + if (!fileInfo.parts.get(request.getPartId())) { + throw new IllegalArgumentException("Cannot get on missing file part."); + } + } + // We already checked that file has requested part, just read it without locking + try (RandomAccessFile file = new RandomAccessFile(fileInfo.localPath.toString(), "r")) { + connection.writeGetResponse(file, request.getPartId(), fileInfo.descriptor); + } + } + + // Leeching part + + private void updateTracker(){ + try (MyLock myLock = MyLock.lock(info.lock.readLock())) { + if (!isRunning) { + return; + } + } catch (Exception e) { + e.printStackTrace(); + } + try { + boolean result = update(serverSocket.getLocalPort()); + notifyTrackerUpdated(result, null); + } catch (Exception e) { + notifyTrackerUpdated(false, e); + } + } + + private void download(ClientInfo.FileInfo info) throws Exception { + List seeders = null; + int currentSeeder = 0; + PartsBitset seederParts = null; + int canOffer = 0; + notifyDownloadStart(info.descriptor); + while (true) { + try (MyLock myLock = MyLock.lock(info.fileLock.readLock())) { + if (!isRunning) { + return; + } + if (info.parts.getCount() == info.descriptor.getNumberOfTheParts()) { + notifyDownloadComplete(info.descriptor); + return; + } + } + + if (seeders == null || seeders.size() == 0) { + try { + seeders = sources(Collections.singletonList(info.descriptor.getFileId())); + currentSeeder = -1; + canOffer = 0; + } catch (IOException e) { + notifyDownloadIssue(info.descriptor, "Failed to fetch seeders.", e); + delay(DELAY); + continue; + } + } + if (seeders == null || seeders.size() == 0) { + notifyDownloadIssue(info.descriptor, "No seeders.", null); + delay(DELAY); + continue; + } + + if (canOffer == 0 && currentSeeder + 1 < seeders.size()) { + currentSeeder++; + try { + seederParts = stat(seeders.get(currentSeeder), info); + } catch (IOException e) { + notifyDownloadIssue(info.descriptor, String.format( + "Failed to stat seeder %s", + seeders.get(currentSeeder).toString() + ), e); + continue; + } + try (MyLock myLock = MyLock.lock(info.fileLock.readLock())) { + seederParts.subtract(info.parts); + } + canOffer = seederParts.getCount(); + } + + if (canOffer == 0) { + if (currentSeeder == seeders.size() - 1) { + seeders = null; + } + notifyDownloadIssue(info.descriptor, "No one seed remaining parts.", null); + delay(DELAY); + continue; + } + + int partId = 0; + if (canOffer > 0) { + partId = seederParts.getFirstBitAtLeast(partId); + try { + get(seeders.get(currentSeeder), info, partId); + } catch (IOException e) { + notifyDownloadIssue(info.descriptor, String.format( + "Downloading error: part %d from seeder %s.", + partId, + seeders.get(currentSeeder).toString() + ), e); + delay(DELAY); + } + boolean needUpdateTracker = false; + try (MyLock myLock = MyLock.lock(info.fileLock.writeLock())) { + info.parts.set(partId, true); + if (info.parts.getCount() == 1) { + needUpdateTracker = true; + } + } + seederParts.set(partId, false); + canOffer--; + if (needUpdateTracker) { + updateTracker(); + } + notifyDownloadPart(info.descriptor, partId); + } + } + } + + private PeerToPeerConnection connectToSeeder(InetSocketAddress seeder) throws IOException { + return new PeerToPeerConnection(new Socket(seeder.getAddress(), seeder.getPort())); + } + + private void delay(long time) { + try { + Thread.sleep(time); + } catch (InterruptedException ignored) { + } + } + + private void notifyTrackerUpdated(boolean result, Throwable e) { + if (callbacks != null) { + callbacks.onTrackerUpdated(result, e); + } + } + + private void notifyDownloadIssue(FileDescriptor descriptor, String message, Throwable e) { + if (callbacks != null) { + callbacks.onDownloadIssue(descriptor, message, e); + } + } + + private void notifyDownloadComplete(FileDescriptor descriptor) { + if (callbacks != null) { + callbacks.onDownloadComplete(descriptor); + } + } + + private void notifyPeerToPeerServerIssue(Throwable e) { + if (callbacks != null) { + callbacks.onPeerToPeerServerIssue(e); + } + } + + private void notifyDownloadStart(FileDescriptor descriptor) { + if (callbacks != null) { + callbacks.onDownloadStart(descriptor); + } + } + + private void notifyDownloadPart(FileDescriptor descriptor, int partId) { + if (callbacks != null) { + callbacks.onDownloadPart(descriptor, partId); + } + } + +} diff --git a/src/main/java/ru/spbau/mit/TorrentGUIListDialog.java b/src/main/java/ru/spbau/mit/TorrentGUIListDialog.java new file mode 100644 index 0000000..004a3c1 --- /dev/null +++ b/src/main/java/ru/spbau/mit/TorrentGUIListDialog.java @@ -0,0 +1,148 @@ +package ru.spbau.mit; + +import org.apache.commons.io.FileUtils; + +import javax.swing.*; +import javax.swing.table.AbstractTableModel; +import java.awt.*; +import java.awt.event.ActionEvent; +import java.awt.event.WindowAdapter; +import java.awt.event.WindowEvent; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +public class TorrentGUIListDialog extends JDialog { + private enum Columns { + //ID, + NAME, + SIZE, + } + + private static final Map COLUMNS_NAMES = new EnumMap<>(Columns.class); + + static { + COLUMNS_NAMES.put(Columns.NAME, "File name"); + COLUMNS_NAMES.put(Columns.SIZE, "File size"); + } + + private static final class TableModel extends AbstractTableModel { + private volatile List data; + + private TableModel(List data) { + this.data = data; + } + + @Override + public int getRowCount() { + return data.size(); + } + + @Override + public int getColumnCount() { + return Columns.values().length; + } + + @Override + public String getColumnName(int column) { + return COLUMNS_NAMES.get(Columns.values()[column]); + } + + @Override + public Object getValueAt(int rowIndex, int columnIndex) { + switch (Columns.values()[columnIndex]) { + case NAME: + return data.get(rowIndex).getFileName(); + case SIZE: + return FileUtils.byteCountToDisplaySize(data.get(rowIndex).getFileSize()); + } + return null; + } + + @Override + public Class getColumnClass(int columnIndex) { + switch (Columns.values()[columnIndex]) { + case NAME: + case SIZE: + return String.class; + } + return null; + } + } + + private JTable table; + private Integer result = null; + + private Action selectAction = new AbstractAction() { + { + putValue(NAME, "Select"); + setEnabled(false); + } + + @Override + public void actionPerformed(ActionEvent event) { + result = table.getSelectedRow(); + dispose(); + } + }; + + private Action cancelAction = new AbstractAction() { + { + putValue(NAME, "Cancel"); + } + + @Override + public void actionPerformed(ActionEvent event) { + result = null; + dispose(); + } + }; + + public TorrentGUIListDialog(Frame owner, List data) { + super(owner, "Select file", DEFAULT_MODALITY_TYPE); + super.setLocationRelativeTo(owner); + + table = new JTable(new TableModel(data)); + table.setAlignmentX(Component.LEFT_ALIGNMENT); + table.setColumnSelectionAllowed(false); + table.setRowSelectionAllowed(true); + table.setCellSelectionEnabled(false); + table.setSelectionMode(ListSelectionModel.SINGLE_SELECTION); + table.getSelectionModel().addListSelectionListener(event -> { + int viewRow = table.getSelectedRow(); + selectAction.setEnabled(viewRow >= 0); + }); + + addWindowListener(new WindowAdapter() { + public void windowClosing(WindowEvent e) { + result = null; + } + }); + + + JPanel topPanel = new JPanel(); + topPanel.setLayout(new BoxLayout(topPanel, BoxLayout.PAGE_AXIS)); + topPanel.setBorder(BorderFactory.createEmptyBorder(10, 10, 10, 10)); + topPanel.add(new JLabel("Select file to download:")); + topPanel.add(Box.createRigidArea(new Dimension(0, 5))); + topPanel.add(new JScrollPane(table)); + + JPanel bottomPanel = new JPanel(); + bottomPanel.setLayout(new BoxLayout(bottomPanel, BoxLayout.LINE_AXIS)); + bottomPanel.setBorder(BorderFactory.createEmptyBorder(0, 10, 10, 10)); + bottomPanel.add(Box.createHorizontalGlue()); + bottomPanel.add(new JButton(cancelAction)); + bottomPanel.add(Box.createRigidArea(new Dimension(10, 0))); + bottomPanel.add(new JButton(selectAction)); + + setLayout(new BorderLayout()); + add(topPanel, BorderLayout.CENTER); + add(bottomPanel, BorderLayout.PAGE_END); + pack(); + } + + public Integer showDialog() { + setVisible(true); + return result; + } +} diff --git a/src/main/java/ru/spbau/mit/TorrentGUIMain.java b/src/main/java/ru/spbau/mit/TorrentGUIMain.java new file mode 100644 index 0000000..4a0efb7 --- /dev/null +++ b/src/main/java/ru/spbau/mit/TorrentGUIMain.java @@ -0,0 +1,410 @@ +package ru.spbau.mit; + +import org.apache.commons.io.FileUtils; + +import javax.swing.*; +import javax.swing.table.AbstractTableModel; +import javax.swing.table.TableCellRenderer; +import java.awt.*; +import java.awt.event.*; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class TorrentGUIMain { + private enum Columns { + NAME, + LOCAL_PATH, + SIZE, + PROGRESS + } + + private static final Map COLUMNS_NAMES = new EnumMap<>(Columns.class); + + static { + COLUMNS_NAMES.put(Columns.NAME, "File name"); + COLUMNS_NAMES.put(Columns.LOCAL_PATH, "Local path"); + COLUMNS_NAMES.put(Columns.SIZE, "File size"); + COLUMNS_NAMES.put(Columns.PROGRESS, "Progress"); + } + + private static final class TableRow { + private String name; + private String localPath; + private String size; + private double progress; + + private TableRow(ClientInfo.FileInfo info) { + try (MyLock myLock = MyLock.lock(info.fileLock.readLock())) { + name = info.descriptor.getFileName(); + localPath = info.localPath.toString(); + size = FileUtils.byteCountToDisplaySize(info.descriptor.getFileSize()); + progress = (info.parts.getCount() + 0.0) / info.descriptor.getNumberOfTheParts(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private static final class TableModel extends AbstractTableModel { + private volatile List data = Collections.emptyList(); + + @Override + public int getRowCount() { + return data.size(); + } + + @Override + public int getColumnCount() { + return Columns.values().length; + } + + @Override + public String getColumnName(int column) { + return COLUMNS_NAMES.get(Columns.values()[column]); + } + + @Override + public Object getValueAt(int rowIndex, int columnIndex) { + switch (Columns.values()[columnIndex]) { + case NAME: + return data.get(rowIndex).name; + case LOCAL_PATH: + return data.get(rowIndex).localPath; + case SIZE: + return data.get(rowIndex).size; + case PROGRESS: + return data.get(rowIndex).progress; + } + return null; + } + + @Override + public Class getColumnClass(int columnIndex) { + switch (Columns.values()[columnIndex]) { + case NAME: + case LOCAL_PATH: + case SIZE: + return String.class; + case PROGRESS: + return Double.TYPE; + } + return null; + } + + private void setData(List newData) { + data = newData; + fireTableDataChanged(); + } + } + + private static final class ProgressRenderer implements TableCellRenderer { + private static final int SCALE = 1000; + + private JProgressBar progressBar = new JProgressBar(0, SCALE); + + private ProgressRenderer() { + progressBar.setStringPainted(true); + progressBar.setMinimum(0); + progressBar.setMaximum(SCALE); + } + + @Override + public Component getTableCellRendererComponent( + JTable table, Object value, + boolean isSelected, boolean hasFocus, + int row, int column + ) { + progressBar.setValue((int) ((double) value * SCALE)); + return progressBar; + } + } + + private JFileChooser fileChooser = new JFileChooser(); + private TableModel tableModel; + private JTextArea logArea; + private ClientInfo info; + private RunningClient runningClient; + private JFrame frame; + + private void writeMessage(String format, Object... args) { + SwingUtilities.invokeLater(() -> logArea.append(String.format(format + "\n", args))); + } + + private final RunningClient.StatusCallbacks callbacks = new RunningClient.StatusCallbacks() { + @Override + public void onTrackerUpdated(boolean result, Throwable e) { + if (!result) { + writeMessage("Failed to update tracker: %s ", e.getMessage()); + } + } + + @Override + public void onDownloadIssue(FileDescriptor descriptor, String message, Throwable e) { + writeMessage( + "File \"%s\" (#%d) download issue: %s (%s)", + descriptor.getFileName(), + descriptor.getFileId(), + message, + (e == null) ? "" : e.getMessage() + ); + } + + @Override + public void onDownloadStart(FileDescriptor descriptor) { + writeMessage("Starting file \"%s\" (#%d) download", descriptor.getFileName(), + descriptor.getFileId()); + } + + @Override + public void onDownloadPart(FileDescriptor descriptor, int partId) { + fetchModel(); + } + + @Override + public void onDownloadComplete(FileDescriptor descriptor) { + writeMessage("File \"%s\" (#%d) downloaded", descriptor.getFileName(), descriptor.getFileId()); + } + + @Override + public void onPeerToPeerServerIssue(Throwable e) { + writeMessage("Seeding server issue: %s", e.getMessage()); + } + }; + + private final Action newFileAction = new AbstractAction() { + { + putValue(NAME, "New file"); + putValue(SHORT_DESCRIPTION, "Upload new file to tracker"); + putValue(MNEMONIC_KEY, KeyEvent.VK_N); + putValue(ACCELERATOR_KEY, KeyStroke.getKeyStroke(KeyEvent.VK_N, InputEvent.CTRL_MASK)); + } + + @Override + public void actionPerformed(ActionEvent event) { + SwingUtilities.invokeLater(() -> { + int ret = fileChooser.showOpenDialog(frame); + if (ret == JFileChooser.APPROVE_OPTION) { + Path p = fileChooser.getSelectedFile().toPath(); + try { + new Client(info).newFile(p); + fetchModel(); + } catch (IOException e) { + showErrorDialog("Failed to upload file: " + e.getMessage()); + } + } + }); + } + }; + + private final Action getFileAction = new AbstractAction() { + { + putValue(NAME, "Get file"); + putValue(SHORT_DESCRIPTION, "Download new file from peers"); + putValue(MNEMONIC_KEY, KeyEvent.VK_D); + putValue(ACCELERATOR_KEY, KeyStroke.getKeyStroke(KeyEvent.VK_D, InputEvent.CTRL_MASK)); + } + + @Override + public void actionPerformed(ActionEvent event) { + SwingUtilities.invokeLater(() -> { + List files; + try { + files = new Client(info).list(); + } catch (IOException e) { + showErrorDialog(String.format("Failed to get file list from tracker: %s\n", e.getMessage())); + return; + } + try (MyLock myLock = MyLock.lock(info.lock.readLock())) { + for (ClientInfo.FileInfo fileInfo : info.files.values()) { + files.remove(fileInfo.descriptor); + } + } catch (Exception e) { + e.printStackTrace(); + } + Integer result = new TorrentGUIListDialog(frame, files).showDialog(); + try { + if (result != null) { + new Client(info).get(files.get(result).getFileId()); + fetchModel(); + } + } catch (Exception e) { + showErrorDialog(String.format("Failed to add file to downloads list: %s\n", e.getMessage())); + } + }); + } + }; + + private final Action startRunAction = new AbstractAction() { + { + putValue(NAME, "Start running"); + putValue(SHORT_DESCRIPTION, "Start seeding and downloading files"); + putValue(MNEMONIC_KEY, KeyEvent.VK_R); + putValue(ACCELERATOR_KEY, KeyStroke.getKeyStroke(KeyEvent.VK_R, InputEvent.CTRL_MASK)); + } + + @Override + public void actionPerformed(ActionEvent event) { + try { + runningClient = new RunningClient(info); + runningClient.startingRun(callbacks); + + writeMessage("Started running"); + setEnabled(false); + stopRunAction.setEnabled(true); + newFileAction.setEnabled(false); + getFileAction.setEnabled(false); + } catch (Exception e) { + e.printStackTrace(); + showErrorDialog("Failed to run client: " + e.getMessage()); + runningClient = null; + } + } + }; + + private final Action stopRunAction = new AbstractAction() { + { + putValue(NAME, "Stop running"); + putValue(SHORT_DESCRIPTION, "Stop seeding and downloading files"); + putValue(MNEMONIC_KEY, KeyEvent.VK_R); + putValue(ACCELERATOR_KEY, KeyStroke.getKeyStroke(KeyEvent.VK_R, InputEvent.CTRL_MASK)); + setEnabled(false); + } + + @Override + public void actionPerformed(ActionEvent e) { + runningClient.shutdown(); + writeMessage("Stopped running"); + setEnabled(false); + startRunAction.setEnabled(true); + newFileAction.setEnabled(true); + getFileAction.setEnabled(true); + } + }; + + private final Action closeAction = new AbstractAction() { + { + putValue(NAME, "Close"); + putValue(SHORT_DESCRIPTION, "Exit application"); + putValue(MNEMONIC_KEY, KeyEvent.VK_Q); + putValue(ACCELERATOR_KEY, KeyStroke.getKeyStroke(KeyEvent.VK_Q, InputEvent.CTRL_MASK)); + } + + @Override + public void actionPerformed(ActionEvent event) { + close(); + } + }; + + private TorrentGUIMain() { + tableModel = new TableModel(); + buildUI(); + try { + info = new ClientInfo(Paths.get("")); + fetchModel(); + } catch (IOException e) { + e.printStackTrace(); + showErrorDialog("Client info: " + e.getMessage()); + ClientInfo.removeInfo(Paths.get("")); + close(); + } + } + + public static void main(String[] args) { + new TorrentGUIMain(); + } + + private void fetchModel() { + SwingUtilities.invokeLater(() -> { + List data; + try (MyLock myLock = MyLock.lock(info.lock.readLock())) { + data = info.files.values() + .stream() + .map(TableRow::new) + .collect(Collectors.toList()); + tableModel.setData(data); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + + private void close() { + frame.dispose(); + if (runningClient != null) { + runningClient.shutdown(); + } + if (info != null) { + try { + info.close(); + } catch (Exception e) { + showErrorDialog("Failed to save info: " + e.getMessage()); + } + } + System.exit(0); + } + + private void buildUI() { + frame = new JFrame("Torrent client"); + frame.setDefaultCloseOperation(WindowConstants.DISPOSE_ON_CLOSE); + frame.addWindowListener(new WindowAdapter() { + @Override + public void windowClosed(WindowEvent event) { + close(); + } + }); + + JTable table = new JTable(tableModel); + table.setFillsViewportHeight(true); + table.setRowSelectionAllowed(false); + table.setColumnSelectionAllowed(false); + table.setCellSelectionEnabled(false); + table.getColumn(COLUMNS_NAMES.get(Columns.NAME)).setMinWidth(200); + table.getColumn(COLUMNS_NAMES.get(Columns.NAME)).setMaxWidth(400); + table.getColumn(COLUMNS_NAMES.get(Columns.LOCAL_PATH)).setMinWidth(200); + table.getColumn(COLUMNS_NAMES.get(Columns.SIZE)).setMinWidth(50); + table.getColumn(COLUMNS_NAMES.get(Columns.SIZE)).setMaxWidth(75); + table.getColumn(COLUMNS_NAMES.get(Columns.PROGRESS)).setMinWidth(50); + table.getColumn(COLUMNS_NAMES.get(Columns.PROGRESS)).setMaxWidth(150); + table.getColumn(COLUMNS_NAMES.get(Columns.PROGRESS)).setCellRenderer(new ProgressRenderer()); + + logArea = new JTextArea(); + logArea.setEditable(false); + logArea.setMinimumSize(new Dimension(200, 100)); + + JMenuBar menuBar = new JMenuBar(); + { + JMenu fileMenu = new JMenu("File"); + fileMenu.add(newFileAction); + fileMenu.add(getFileAction); + fileMenu.addSeparator(); + fileMenu.add(startRunAction); + fileMenu.add(stopRunAction); + fileMenu.addSeparator(); + fileMenu.add(closeAction); + menuBar.add(fileMenu); + } + + SwingUtilities.invokeLater(() -> { + frame.setLayout(new BoxLayout(frame.getContentPane(), BoxLayout.PAGE_AXIS)); + frame.setJMenuBar(menuBar); + frame.add(new JScrollPane(table)); + frame.add(new JScrollPane(logArea)); + frame.setMinimumSize(new Dimension(700, 600)); + frame.pack(); + frame.setLocationRelativeTo(null); + frame.setVisible(true); + }); + } + + private void showErrorDialog(String message) { + JOptionPane.showMessageDialog(frame, message, "Error", JOptionPane.ERROR_MESSAGE); + writeMessage("Error: " + message); + } + +} diff --git a/src/main/java/ru/spbau/mit/Tracker.java b/src/main/java/ru/spbau/mit/Tracker.java new file mode 100644 index 0000000..9013e4f --- /dev/null +++ b/src/main/java/ru/spbau/mit/Tracker.java @@ -0,0 +1,192 @@ +package ru.spbau.mit; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +/** + * Created by Анастасия on 11.04.2016. + */ +public class Tracker implements AutoCloseable { + private static final String STATE_FILE = "tracker-state.dat"; + + private Path workingDirectory; + private ExecutorService threadPool; + private ScheduledExecutorService scheduler; + private ServerSocket serverSocket; + private List files; + private Map> seeders; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public Tracker(Path workingDirectory) throws Exception { + try (MyLock myLock = MyLock.lock(lock.writeLock())) { + this.workingDirectory = workingDirectory; + serverSocket = new ServerSocket(TrackerConnection.PORT); + threadPool = Executors.newCachedThreadPool(); + scheduler = Executors.newScheduledThreadPool(1); + load(); + } + threadPool.submit(this::work); + } + + @Override + public void close() throws Exception { + try (MyLock myLock = MyLock.lock(lock.writeLock())) { + serverSocket.close(); + } + threadPool.shutdown(); + scheduler.shutdown(); + store(); + } + + private Socket accept() throws Exception { + try (MyLock myLock = MyLock.lock(lock.readLock())) { + if (serverSocket.isClosed()) { + return null; + } + } + return serverSocket.accept(); + } + + private void work() { + while (true) { + try { + Socket socket = accept(); + if (socket == null) { + return; + } + //We listen the connection for a request byte + threadPool.submit(() -> listenConnection(socket)); + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + } + + private void listenConnection(Socket socket) { + try (TrackerConnection connection = new TrackerConnection(socket)) { + //We read request byte and choose right function + int request = connection.readRequest(); + switch (request) { + case TrackerConnection.LIST_REQUEST: + list(connection); + break; + case TrackerConnection.UPLOAD_REQUEST: + upload(connection); + break; + case TrackerConnection.SOURCES_REQUEST: + sources(connection); + break; + case TrackerConnection.UPDATE_REQUEST: + update(connection); + break; + default: + System.err.printf("Request: %d is not correct!\n", request); + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void load() throws IOException { + Path path = workingDirectory.resolve(STATE_FILE); + if (Files.exists(path)) { + try (DataInputStream inputStream = new DataInputStream(Files.newInputStream(path))) { + files = ReadWriteHelper.readCollection(inputStream, new ArrayList<>(), + inputStream1 -> FileDescriptor.readInfoFromInputStream(inputStream1, true)); + } + } else { + files = new ArrayList<>(); + } + seeders = new HashMap<>(); + } + + private void store() throws IOException { + Path path = workingDirectory.resolve(STATE_FILE); + if (!Files.exists(path)) { + Files.createDirectories(workingDirectory); + Files.createFile(path); + } + try (DataOutputStream outputStream = new DataOutputStream(Files.newOutputStream(path))) { + ReadWriteHelper.writeCollection(outputStream, files, + (outputStream1, w) -> w.writeInfoToOutputStream(outputStream1)); + } + } + + private void list(TrackerConnection connection) throws Exception { + try (MyLock myLock = MyLock.lock(lock.readLock())) { + connection.writeListResponse(files); + } + } + + private void upload(TrackerConnection connection) throws Exception { + FileDescriptor fileDescriptor = connection.readUploadRequest(); + try (MyLock myLock = MyLock.lock(lock.writeLock())) { + int newId = files.size(); + fileDescriptor = fileDescriptor.setId(newId); + files.add(fileDescriptor); + } + connection.writeUploadResponse(fileDescriptor.getFileId()); + } + + private void sources(TrackerConnection connection) throws Exception { + List request = connection.readSourcesRequest(); + List result; + try (MyLock myLock = MyLock.lock(lock.readLock())) { + result = request.stream() + .flatMap(i -> seeders + .getOrDefault(i, Collections.emptySet()) + .stream() + ) + .distinct() + .map(ClientDescriptor::getAddress) + .collect(Collectors.toList()); + } + connection.writeSourcesResponse(result); + } + + private void update(TrackerConnection connection) throws Exception { + ClientDescriptor receivedClientDescriptor = connection.readUpdateRequest(); + ClientDescriptor clientDescriptor = new ClientDescriptor( + new InetSocketAddress(connection.getHost(), receivedClientDescriptor.getAddress().getPort()), + receivedClientDescriptor.getIdList() + ); + try (MyLock myLock = MyLock.lock(lock.writeLock())) { + for (int id : clientDescriptor.getIdList()) { + if (seeders.get(id) == null) { + seeders.put(id, new HashSet<>()); + } + seeders.get(id).add(clientDescriptor); + } + } + + scheduler.schedule(() -> { + try (MyLock myLock = MyLock.lock(lock.writeLock())) { + for (int id : clientDescriptor.getIdList()) { + seeders.get(id).remove(clientDescriptor); + } + } catch (Exception e) { + e.printStackTrace(); + } + }, TrackerConnection.UPDATE_DELAY, TimeUnit.MILLISECONDS); + + connection.writeUpdateResponse(true); + } + + +} diff --git a/src/main/java/ru/spbau/mit/TrackerConnection.java b/src/main/java/ru/spbau/mit/TrackerConnection.java new file mode 100644 index 0000000..4d4b834 --- /dev/null +++ b/src/main/java/ru/spbau/mit/TrackerConnection.java @@ -0,0 +1,128 @@ +package ru.spbau.mit; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Created by Анастасия on 11.04.2016. + */ +public class TrackerConnection extends Connection { + + public static final int PORT = 8081; + public static final int UPDATE_DELAY = 60 * 1000; + + public static final int LIST_REQUEST = 1; + public static final int UPLOAD_REQUEST = 2; + public static final int SOURCES_REQUEST = 3; + public static final int UPDATE_REQUEST = 4; + + //Constructor for our class same as constructor for Connection class + protected TrackerConnection(Socket socket) throws IOException { + super(socket); + } + + //Methods related to list() function + + public void sendListRequest() throws IOException { + DataOutputStream outputStream = getOutputStream(); + outputStream.writeByte(LIST_REQUEST); + outputStream.flush(); + } + + public void writeListResponse(Collection files) throws IOException { + writeCollection(files, (outputStream, fileDescriptor) -> { + if (!fileDescriptor.hasFileGotId()) { + throw new IllegalStateException("Uploaded files must have id."); + } + fileDescriptor.writeInfoToOutputStream(outputStream); + }); + getOutputStream().flush(); + } + + public List readListResponse() throws IOException { + return readCollection(new ArrayList<>(), + (inputStream) -> FileDescriptor.readInfoFromInputStream(inputStream, true)); + } + + //Methods related to upload() function + + public void sendUploadRequest(FileDescriptor file) throws IOException { + if (file.hasFileGotId()) { + throw new IllegalStateException("File, we want to upload, can't have id!"); + } + DataOutputStream outputStream = getOutputStream(); + outputStream.writeByte(UPLOAD_REQUEST); + file.writeInfoToOutputStream(outputStream); + outputStream.flush(); + } + + public FileDescriptor readUploadRequest() throws IOException { + return FileDescriptor.readInfoFromInputStream(getInputStream(), false); + } + + public void writeUploadResponse(int fileId) throws IOException { + DataOutputStream outputStream = getOutputStream(); + outputStream.writeInt(fileId); + outputStream.flush(); + } + + public int readUploadResponse() throws IOException { + return getInputStream().readInt(); + } + + //Methods related to sources() function + + public void sendSourcesRequest(Collection idList) throws IOException { + if (idList.size() == 0) { + throw new IllegalStateException("Error! idList is empty!"); + } + DataOutputStream outputStream = getOutputStream(); + outputStream.writeByte(SOURCES_REQUEST); + writeCollection(idList, DataOutputStream::writeInt); + outputStream.flush(); + } + + public List readSourcesRequest() throws IOException { + return readCollection(new ArrayList<>(), DataInputStream::readInt); + } + + public void writeSourcesResponse(Collection addresses) throws IOException { + writeCollection(addresses, ReadWriteHelper::writeAddress); + } + + public List readSourcesResponse() throws IOException { + return readCollection(new ArrayList<>(), ReadWriteHelper::readAddress); + } + + //Methods related to update() function + + public void sendUpdateRequest(ClientDescriptor clientDescriptor) throws IOException { + if (clientDescriptor.getIdList().size() == 0) { + throw new IllegalStateException("Error! idList is empty!"); + } + DataOutputStream outputStream = getOutputStream(); + outputStream.writeByte(UPDATE_REQUEST); + clientDescriptor.writeInfoToOutputStream(outputStream); + outputStream.flush(); + } + + public ClientDescriptor readUpdateRequest() throws IOException { + return ClientDescriptor.readInfoFromInputStream(getInputStream()); + } + + public void writeUpdateResponse(boolean isSuccessful) throws IOException { + DataOutputStream outputStream = getOutputStream(); + outputStream.writeBoolean(isSuccessful); + outputStream.flush(); + } + + public boolean readUpdateResponse() throws IOException { + return getInputStream().readBoolean(); + } +} diff --git a/src/main/java/ru/spbau/mit/TrackerMain.java b/src/main/java/ru/spbau/mit/TrackerMain.java new file mode 100644 index 0000000..7cc5685 --- /dev/null +++ b/src/main/java/ru/spbau/mit/TrackerMain.java @@ -0,0 +1,22 @@ +package ru.spbau.mit; + +import java.io.IOException; +import java.nio.file.Paths; + +public abstract class TrackerMain { + public static void main(String[] args) { + try { + Tracker tracker = new Tracker(Paths.get("")); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + tracker.close(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/client-01 b/src/test/client-01 new file mode 100644 index 0000000..8f2533c --- /dev/null +++ b/src/test/client-01 @@ -0,0 +1 @@ +client111 \ No newline at end of file diff --git a/src/test/client-02 b/src/test/client-02 new file mode 100644 index 0000000..b6cb5f9 --- /dev/null +++ b/src/test/client-02 @@ -0,0 +1 @@ +client111 diff --git a/src/test/client-03 b/src/test/client-03 new file mode 100644 index 0000000..b6cb5f9 --- /dev/null +++ b/src/test/client-03 @@ -0,0 +1 @@ +client111 diff --git a/src/test/java/ru/spbau/mit/Tests.java b/src/test/java/ru/spbau/mit/Tests.java new file mode 100644 index 0000000..e550b0b --- /dev/null +++ b/src/test/java/ru/spbau/mit/Tests.java @@ -0,0 +1,204 @@ +package ru.spbau.mit; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +/** + * Created by Анастасия on 11.04.2016. + */ +public class Tests { + private static final Path EXAMPLE_PATH = Paths.get("src", "test", "mainFile"); + private static final Path TRACKER_DIR = Paths.get("test", "tracker"); + private static final Path CLIENT1_DIR = Paths.get("test", "client-01"); + private static final Path CLIENT2_DIR = Paths.get("test", "client-02"); + private static final Path CLIENT3_DIR = Paths.get("test", "client-03"); + private static final long TIME_LIMIT = 70 * 1000L; + + @Test + public void listAndUploadTest() throws Throwable { + try ( + Tracker tracker = new Tracker(TRACKER_DIR); + ClientInfo clientInfo1 = new ClientInfo("localhost", CLIENT1_DIR); + ClientInfo clientInfo2 = new ClientInfo("localhost", CLIENT2_DIR) + ) { + Client client1 = new Client(clientInfo1); + Client client2 = new Client(clientInfo2); + assertAllCollectionEquals(Collections.emptyList(), client1.list(), client2.list()); + + FileDescriptor descriptor1 = client1.newFile(EXAMPLE_PATH); + FileDescriptor descriptor2 = client2.newFile(EXAMPLE_PATH); + assertNotEquals("Ids should be different", descriptor1.getFileId(), descriptor2.getFileId()); + + assertAllCollectionEquals(Arrays.asList(descriptor1, descriptor2), client1.list(), client2.list()); + } + } + + @Test + public void listConsistencyTest() throws Throwable { + try (ClientInfo clientInfo1 = new ClientInfo("localhost", CLIENT1_DIR)) { + Client client1 = new Client(clientInfo1); + FileDescriptor descriptor; + try (Tracker tracker = new Tracker(TRACKER_DIR)) { + descriptor = client1.newFile(EXAMPLE_PATH); + } + + List list; + try (Tracker tracker = new Tracker(TRACKER_DIR)) { + list = client1.list(); + } + assertEquals(Collections.singletonList(descriptor), list); + } + } + + @Test(timeout = TIME_LIMIT) + public void testDownload() throws Throwable { + final DownloadWaiter waiter2 = new DownloadWaiter(); + final DownloadWaiter waiter3 = new DownloadWaiter(); + FileDescriptor descriptor; + try ( + Tracker tracker = new Tracker(TRACKER_DIR); + ClientInfo clientInfo2 = new ClientInfo("localhost", CLIENT2_DIR) + ) { + Client client2 = new Client(clientInfo2); + RunningClient runningClient2 = new RunningClient(clientInfo2); + + try (ClientInfo clientInfo1 = new ClientInfo("localhost", CLIENT1_DIR)) { + Client client1 = new Client(clientInfo1); + descriptor = client1.newFile(EXAMPLE_PATH); + assertTrue(client2.get(descriptor.getFileId())); + RunningClient runningClient1 = new RunningClient(clientInfo1); + // Seeding + runningClient1.startingRun(null); + // Leeching + runningClient2.startingRun(waiter2); + + //System.err.println("after starting run"); + + synchronized (waiter2) { + while (!waiter2.ready) { + waiter2.wait(); + } + } + //System.err.println("before first shutdown"); + runningClient1.shutdown(); + //System.err.println("after first shutdown"); + } + + //Testing that now client2 seeding + try (ClientInfo clientInfo3 = new ClientInfo("localhost", CLIENT3_DIR)) { + Client client3 = new Client(clientInfo3); + assertTrue(client3.get(descriptor.getFileId())); + + RunningClient runningClient3 = new RunningClient(clientInfo3); + //And leeching + runningClient3.startingRun(waiter3); + synchronized (waiter3) { + while (!waiter3.ready) { + waiter3.wait(); + } + } + } + + runningClient2.shutdown(); + } + + //System.err.println("before downloaded path"); + Path downloadedPath = Paths.get( + "downloads", + Integer.toString(descriptor.getFileId()), + EXAMPLE_PATH.getFileName().toString() + ); + assertTrue("Downloaded file is different!", FileUtils.contentEquals( + EXAMPLE_PATH.toFile(), + CLIENT2_DIR.resolve(downloadedPath).toFile() + )); + /*assertTrue("Downloaded file is different!", FileUtils.contentEquals( + EXAMPLE_PATH.toFile(), + CLIENT3_DIR.resolve(downloadedPath).toFile() + ));*/ + } + + @Before + @After + public void clear() throws IOException { + clearDirectory("test"); + } + + private void clearDirectory(String name) throws IOException { + Path path = Paths.get(name); + if (!Files.exists(path)) { + return; + } + + Files.walkFileTree(path, new Deleter()); + } + + private void assertAllCollectionEquals(Collection expected, Collection... others) { + for (Collection other : others) { + assertEquals(expected, other); + } + } + + private static class Deleter extends SimpleFileVisitor { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return super.visitFile(file, attrs); + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.delete(dir); + return super.postVisitDirectory(dir, exc); + } + } + + private static final class DownloadWaiter implements RunningClient.StatusCallbacks { + private boolean ready = false; + + private DownloadWaiter() { + } + + @Override + public void onTrackerUpdated(boolean result, Throwable e) { + } + + @Override + public void onDownloadIssue(FileDescriptor descriptor, String message, Throwable e) { + } + + @Override + public void onDownloadStart(FileDescriptor descriptor) { + } + + @Override + public void onDownloadPart(FileDescriptor descriptor, int partId) { + } + + @Override + public void onDownloadComplete(FileDescriptor descriptor) { + synchronized (this) { + ready = true; + notify(); + } + } + + @Override + public void onPeerToPeerServerIssue(Throwable e) { + } + } +} diff --git a/src/test/mainFile b/src/test/mainFile new file mode 100644 index 0000000..ba2906d --- /dev/null +++ b/src/test/mainFile @@ -0,0 +1 @@ +main diff --git a/src/test/tracker b/src/test/tracker new file mode 100644 index 0000000..05a682b --- /dev/null +++ b/src/test/tracker @@ -0,0 +1 @@ +Hello! \ No newline at end of file