diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..79feeb7
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,57 @@
+
+ 4.0.0
+
+ ru.spbau.aastarkova
+ homeworks
+ 1.0-SNAPSHOT
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ com.google.guava
+ guava
+ 19.0
+
+
+ commons-io
+ commons-io
+ 2.4
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.3
+
+ 1.8
+ 1.8
+
+ -Xlint:all
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.3
+
+ 1.8
+ 1.8
+
+ -Xlint:all
+
+
+
+
+
+
diff --git a/src/main/java/ru/spbau/mit/Client.java b/src/main/java/ru/spbau/mit/Client.java
new file mode 100644
index 0000000..d8e11a7
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/Client.java
@@ -0,0 +1,503 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * Created by Анастасия on 11.04.2016.
+ */
+public class Client implements AutoCloseable {
+ private static final long DELAY = 1000;
+ private static final String info_FILE = "client-info.dat";
+ private static final String DOWNLOADS_DIR = "downloads";
+
+ private Path workingDirectory;
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private Map files;
+ private String host;
+
+ private StatusCallbacks callbacks = null;
+
+ // For run() function
+ private boolean isRunning = false;
+ private ServerSocket serverSocket;
+ private ExecutorService threadPool;
+ private ScheduledExecutorService scheduler;
+
+ public Client(String host, Path workingDirectory) throws IOException {
+ this.host = host;
+ this.workingDirectory = workingDirectory;
+ load();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (isRunning) {
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ isRunning = false;
+ serverSocket.close();
+ }
+ threadPool.shutdown();
+ scheduler.shutdown();
+ }
+ store();
+ }
+
+ private static final class FileInfo {
+ private ReadWriteLock fileLock = new ReentrantReadWriteLock();
+ private FileDescriptor descriptor;
+ private PartsBitset parts;
+ private Path localPath;
+
+ private FileInfo(FileDescriptor descriptor, Path localPath, Path workingDirectory) throws IOException {
+ this(descriptor, new PartsBitset(descriptor.getNumberOfTheParts(), localPath != null),
+ localPath, workingDirectory);
+ }
+
+ private FileInfo(FileDescriptor descriptor, PartsBitset parts, Path localPath,
+ Path workingDirectory) throws IOException {
+ this.descriptor = descriptor;
+ this.parts = parts;
+ if (localPath == null) {
+ this.localPath = workingDirectory.resolve(Paths.get(
+ DOWNLOADS_DIR,
+ Integer.toString(descriptor.getFileId()),
+ descriptor.getFileName()
+ ));
+ Files.createDirectories(this.localPath.getParent());
+ try (RandomAccessFile file = new RandomAccessFile(this.localPath.toString(), "rw")) {
+ file.setLength(descriptor.getFileSize());
+ }
+ } else {
+ this.localPath = localPath;
+ }
+ }
+
+ private void writeDataToOutputStream(DataOutputStream outputStream) throws IOException {
+ descriptor.writeInfoToOutputStream(outputStream);
+ parts.writeDataToOutputStream(outputStream);
+ outputStream.writeUTF(localPath.toString());
+ }
+
+ private static FileInfo readDataFromInputStream(DataInputStream inputStream) throws IOException {
+ FileDescriptor fileDescriptor = FileDescriptor.readInfoFromInputStream(inputStream, true);
+ PartsBitset parts = PartsBitset.readDataFromInputStream(inputStream, fileDescriptor.getNumberOfTheParts());
+ String localPath = inputStream.readUTF();
+ return new FileInfo(fileDescriptor, parts, Paths.get(localPath), null);
+ }
+ }
+
+ public interface StatusCallbacks {
+ void onTrackerUpdated(boolean result, Throwable e);
+
+ void onDownloadIssue(FileDescriptor descriptor, String message, Throwable e);
+
+ void onDownloadStart(FileDescriptor descriptor);
+
+ void onDownloadPart(FileDescriptor descriptor, int partId);
+
+ void onDownloadComplete(FileDescriptor descriptor);
+
+ void onPeerToPeerServerIssue(Throwable e);
+ }
+
+ public void setCallbacks(StatusCallbacks callbacks) {
+ this.callbacks = callbacks;
+ }
+
+ /*
+ * Send request to the server to get the list of distributed files
+ * using sendListRequest and readListResponse functions, defined in class TrackerConnection
+ */
+ public List list() throws IOException {
+ try (TrackerConnection connection = connectToTracker()) {
+ connection.sendListRequest();
+ return connection.readListResponse();
+ }
+ }
+
+ public boolean get(int id) throws Exception {
+ if (files.containsKey(id)) {
+ return false;
+ }
+ FileDescriptor serverDescriptor = list().stream()
+ .filter(descriptor -> descriptor.getFileId() == id)
+ .findAny().orElse(null);
+ if (serverDescriptor == null) {
+ return false;
+ }
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ files.put(id, new FileInfo(serverDescriptor, null, workingDirectory));
+ }
+ return true;
+ }
+
+ public FileDescriptor newFile(Path path) throws Exception {
+ if (!Files.isRegularFile(path)) {
+ throw new IllegalArgumentException("File not exists or is not a regular file.");
+ }
+
+ FileDescriptor newDescriptor = new FileDescriptor(Files.size(path), path.getFileName().toString());
+ try (TrackerConnection connection = connectToTracker()) {
+ connection.sendUploadRequest(newDescriptor);
+ int newId = connection.readUploadResponse();
+ newDescriptor.setId(newId);
+ }
+ FileInfo newInfo = new FileInfo(newDescriptor, path, null);
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ files.put(newDescriptor.getFileId(), newInfo);
+ }
+ return newDescriptor;
+ }
+
+ public void run() throws Exception {
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ isRunning = true;
+
+ threadPool = Executors.newCachedThreadPool();
+ scheduler = Executors.newScheduledThreadPool(1);
+
+ // Starting download
+ for (FileInfo info : files.values()) {
+ if (info.parts.getCount() == info.descriptor.getNumberOfTheParts()) {
+ continue;
+ }
+ threadPool.submit(() -> {
+ try {
+ download(info);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ // Seeding server starts
+ serverSocket = new ServerSocket(0);
+ threadPool.submit(this::server);
+
+ // Tracking update loop starts
+ scheduler.scheduleAtFixedRate(this::updateTracker, 0, DELAY, TimeUnit.MILLISECONDS);
+ } catch (IOException e) {
+ threadPool.shutdownNow();
+ scheduler.shutdownNow();
+ isRunning = false;
+ throw e;
+ }
+ }
+
+ // Protocol requests
+
+ private List sources(Collection files) throws IOException {
+ try (TrackerConnection connection = connectToTracker()) {
+ connection.sendSourcesRequest(files);
+ return connection.readSourcesResponse();
+ }
+ }
+
+ private PartsBitset stat(InetSocketAddress seeder, FileInfo info) throws IOException {
+ try (PeerToPeerConnection connection = connectToSeeder(seeder)) {
+ connection.sendStatRequest(info.descriptor.getFileId());
+ return connection.readStatResponse(info.descriptor.getNumberOfTheParts());
+ }
+ }
+
+ private void get(InetSocketAddress seeder, FileInfo info, int partId) throws IOException {
+ try (PeerToPeerConnection connection = connectToSeeder(seeder)) {
+ connection.sendGetRequest(new Request(info.descriptor.getFileId(), partId));
+ try (RandomAccessFile file = new RandomAccessFile(info.localPath.toString(), "rw")) {
+ connection.readGetResponse(file, partId, info.descriptor);
+ }
+ }
+ }
+
+ // Seeding part
+
+ private boolean update(int port) throws Exception {
+ List availableFiles;
+ try (MyLock myLock = MyLock.lock(lock.readLock())) {
+ availableFiles = files
+ .values()
+ .stream()
+ .filter(fileInfo -> {
+ try (MyLock myLock1 = MyLock.lock(fileInfo.fileLock.readLock())) {
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return fileInfo.parts.getCount() > 0;
+ })
+ .map(fileInfo -> fileInfo.descriptor.getFileId())
+ .collect(Collectors.toList());
+ }
+ ClientDescriptor descriptor = new ClientDescriptor(new InetSocketAddress("", port), availableFiles);
+ try (TrackerConnection trackerConnection = connectToTracker()) {
+ trackerConnection.sendUpdateRequest(descriptor);
+ return trackerConnection.readUpdateResponse();
+ }
+ }
+
+ private void server() {
+ while (true) {
+ try {
+ Socket socket = serverSocket.accept();
+ threadPool.submit(() -> listen(socket));
+ } catch (IOException e) {
+ notifyPeerToPeerServerIssue(e);
+ break;
+ }
+ }
+ }
+
+ private void listen(Socket socket) {
+ try (PeerToPeerConnection connection = new PeerToPeerConnection(socket)) {
+ int request = connection.readRequest();
+ switch (request) {
+ case PeerToPeerConnection.STAT_REQUEST:
+ statRequestFunction(connection);
+ break;
+ case PeerToPeerConnection.GET_REQUEST:
+ getRequestFunction(connection);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Request %d from connection is incorrect!.", request)
+ );
+ }
+ } catch (Exception e) {
+ notifyPeerToPeerServerIssue(e);
+ }
+ }
+
+ private void statRequestFunction(PeerToPeerConnection connection) throws Exception {
+ int fileId = connection.readStatRequest();
+ FileInfo info;
+ try (MyLock myLock = MyLock.lock(lock.readLock())) {
+ info = files.get(fileId);
+ }
+ try (MyLock myLock = MyLock.lock(info.fileLock.readLock())) {
+ connection.writeStatResponse(info.parts);
+ }
+ }
+
+ private void getRequestFunction(PeerToPeerConnection connection) throws Exception {
+ Request request = connection.readGetRequest();
+ FileInfo info;
+ try (MyLock myLock = MyLock.lock(lock.readLock())) {
+ info = files.get(request.getFileId());
+ }
+ try (MyLock myLock = MyLock.lock(info.fileLock.readLock())) {
+ if (!info.parts.get(request.getPartId())) {
+ throw new IllegalArgumentException("Cannot get on missing file part.");
+ }
+ }
+ // We already checked that file has requested part, just read it without locking
+ try (RandomAccessFile file = new RandomAccessFile(info.localPath.toString(), "r")) {
+ connection.writeGetResponse(file, request.getPartId(), info.descriptor);
+ }
+ }
+
+ // Leeching part
+
+ private void updateTracker(){
+ try (MyLock myLock = MyLock.lock(lock.readLock())) {
+ if (!isRunning) {
+ return;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ try {
+ boolean result = update(serverSocket.getLocalPort());
+ notifyTrackerUpdated(result, null);
+ } catch (Exception e) {
+ notifyTrackerUpdated(false, e);
+ }
+ }
+
+ private void download(FileInfo info) throws Exception {
+ List seeders = null;
+ int currentSeeder = 0;
+ PartsBitset seederParts = null;
+ int canOffer = 0;
+ notifyDownloadStart(info.descriptor);
+ while (true) {
+ try (MyLock myLock = MyLock.lock(info.fileLock.readLock())) {
+ if (!isRunning) {
+ return;
+ }
+ if (info.parts.getCount() == info.descriptor.getNumberOfTheParts()) {
+ notifyDownloadComplete(info.descriptor);
+ return;
+ }
+ }
+
+ if (seeders == null || seeders.size() == 0) {
+ try {
+ seeders = sources(Collections.singletonList(info.descriptor.getFileId()));
+ currentSeeder = -1;
+ canOffer = 0;
+ } catch (IOException e) {
+ notifyDownloadIssue(info.descriptor, "Failed to fetch seeders.", e);
+ delay(DELAY);
+ continue;
+ }
+ }
+ if (seeders == null || seeders.size() == 0) {
+ notifyDownloadIssue(info.descriptor, "No seeders.", null);
+ delay(DELAY);
+ continue;
+ }
+
+ if (canOffer == 0 && currentSeeder + 1 < seeders.size()) {
+ currentSeeder++;
+ try {
+ seederParts = stat(seeders.get(currentSeeder), info);
+ } catch (IOException e) {
+ notifyDownloadIssue(info.descriptor, String.format(
+ "Failed to stat seeder %s",
+ seeders.get(currentSeeder).toString()
+ ), e);
+ continue;
+ }
+ try (MyLock myLock = MyLock.lock(info.fileLock.readLock())) {
+ seederParts.subtract(info.parts);
+ }
+ canOffer = seederParts.getCount();
+ }
+
+ if (canOffer == 0) {
+ if (currentSeeder == seeders.size() - 1) {
+ seeders = null;
+ }
+ notifyDownloadIssue(info.descriptor, "No one seed remaining parts.", null);
+ delay(DELAY);
+ continue;
+ }
+
+ int partId = 0;
+ if (canOffer > 0) {
+ partId = seederParts.getFirstBitAtLeast(partId);
+ try {
+ get(seeders.get(currentSeeder), info, partId);
+ } catch (IOException e) {
+ notifyDownloadIssue(info.descriptor, String.format(
+ "Downloading error: part %d from seeder %s.",
+ partId,
+ seeders.get(currentSeeder).toString()
+ ), e);
+ delay(DELAY);
+ }
+ boolean needUpdateTracker = false;
+ try (MyLock myLock = MyLock.lock(info.fileLock.writeLock())) {
+ info.parts.set(partId, true);
+ if (info.parts.getCount() == 1) {
+ needUpdateTracker = true;
+ }
+ }
+ seederParts.set(partId, false);
+ canOffer--;
+ if (needUpdateTracker) {
+ updateTracker();
+ }
+ notifyDownloadPart(info.descriptor, partId);
+ }
+ }
+ }
+
+ private void store() throws IOException {
+ Path info = workingDirectory.resolve(info_FILE);
+ if (!Files.exists(info)) {
+ Files.createDirectories(workingDirectory);
+ Files.createFile(info);
+ }
+ try (DataOutputStream outputStream = new DataOutputStream(Files.newOutputStream(info))) {
+ ReadWriteHelper.writeCollection(outputStream, files.values(),
+ (outputStream1, w) -> w.writeDataToOutputStream(outputStream1));
+ }
+ }
+
+ private void load() throws IOException {
+ Path info = workingDirectory.resolve(info_FILE);
+ if (Files.exists(info)) {
+ try (DataInputStream inputStream = new DataInputStream(Files.newInputStream(info))) {
+ int size = inputStream.readInt();
+ files = new HashMap<>(size);
+ while (size > 0) {
+ --size;
+ FileInfo fs = FileInfo.readDataFromInputStream(inputStream);
+ files.put(fs.descriptor.getFileId(), fs);
+ }
+ }
+ } else {
+ files = new HashMap<>();
+ }
+ }
+
+ private TrackerConnection connectToTracker() throws IOException {
+ return new TrackerConnection(new Socket(host, TrackerConnection.PORT));
+ }
+
+ private PeerToPeerConnection connectToSeeder(InetSocketAddress seeder) throws IOException {
+ return new PeerToPeerConnection(new Socket(seeder.getAddress(), seeder.getPort()));
+ }
+
+ private void delay(long time) {
+ try {
+ Thread.sleep(time);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ private void notifyTrackerUpdated(boolean result, Throwable e) {
+ if (callbacks != null) {
+ callbacks.onTrackerUpdated(result, e);
+ }
+ }
+
+ private void notifyDownloadIssue(FileDescriptor descriptor, String message, Throwable e) {
+ if (callbacks != null) {
+ callbacks.onDownloadIssue(descriptor, message, e);
+ }
+ }
+
+ private void notifyDownloadComplete(FileDescriptor descriptor) {
+ if (callbacks != null) {
+ callbacks.onDownloadComplete(descriptor);
+ }
+ }
+
+ private void notifyPeerToPeerServerIssue(Throwable e) {
+ if (callbacks != null) {
+ callbacks.onPeerToPeerServerIssue(e);
+ }
+ }
+
+ private void notifyDownloadStart(FileDescriptor descriptor) {
+ if (callbacks != null) {
+ callbacks.onDownloadStart(descriptor);
+ }
+ }
+
+ private void notifyDownloadPart(FileDescriptor descriptor, int partId) {
+ if (callbacks != null) {
+ callbacks.onDownloadPart(descriptor, partId);
+ }
+ }
+
+}
diff --git a/src/main/java/ru/spbau/mit/ClientDescriptor.java b/src/main/java/ru/spbau/mit/ClientDescriptor.java
new file mode 100644
index 0000000..5220792
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/ClientDescriptor.java
@@ -0,0 +1,45 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by Анастасия on 10.04.2016.
+ */
+public final class ClientDescriptor {
+ private InetSocketAddress address;
+ private List idList;
+
+ public ClientDescriptor(InetSocketAddress address, List idList) {
+ this.address = address;
+ this.idList = idList;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ public List getIdList() {
+ return idList;
+ }
+
+ public void writeInfoToOutputStream(DataOutputStream outputStream) throws IOException {
+ //Write address to output stream
+ ReadWriteHelper.writeAddress(outputStream, address);
+ //Write idList to output stream
+ ReadWriteHelper.writeCollection(outputStream, idList, DataOutputStream::writeInt);
+ }
+
+ public static ClientDescriptor readInfoFromInputStream(DataInputStream inputStream) throws IOException {
+ return new ClientDescriptor(
+ //Read address from input stream
+ ReadWriteHelper.readAddress(inputStream),
+ //Read idList from input stream
+ ReadWriteHelper.readCollection(inputStream, new ArrayList<>(), DataInputStream::readInt));
+ }
+
+}
diff --git a/src/main/java/ru/spbau/mit/Connection.java b/src/main/java/ru/spbau/mit/Connection.java
new file mode 100644
index 0000000..f4bdfb3
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/Connection.java
@@ -0,0 +1,67 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collection;
+
+/**
+ * Created by Анастасия on 10.04.2016.
+ */
+public abstract class Connection implements AutoCloseable {
+ private Socket socket;
+ private DataInputStream inputStream;
+ private DataOutputStream outputStream;
+
+ //Constructor for our abstract class
+ protected Connection(Socket socket) throws IOException {
+ this.socket = socket;
+ //Creating new input and output streams using socket
+ inputStream = new DataInputStream(socket.getInputStream());
+ outputStream = new DataOutputStream(socket.getOutputStream());
+ }
+
+ @Override
+ public void close() {
+ try {
+ socket.close();
+ } catch (IOException ignoredException) {
+ }
+ }
+
+ public DataOutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ public DataInputStream getInputStream() {
+ return inputStream;
+ }
+
+ //We need this function in Tracker class in update() method
+ public String getHost() {
+ return ((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName();
+ }
+
+ /*
+ * writeCollection function for the connection, using ReadWriteHelper writeConnection function
+ * we need it in methods of class TrackerConnection
+ */
+ protected void writeCollection(Collection collection,
+ ReadWriteHelper.Writer super T> writer) throws IOException {
+ ReadWriteHelper.writeCollection(outputStream, collection, writer);
+ }
+
+ //readCollection function for the connection, using ReadWriteHelper readConnection function
+ protected > R readCollection(R collection,
+ ReadWriteHelper.Reader extends T> reader)
+ throws IOException {
+ return ReadWriteHelper.readCollection(inputStream, collection, reader);
+ }
+
+ //We need this function in listenConnection function in class Tracker
+ public int readRequest() throws IOException {
+ return inputStream.readUnsignedByte();
+ }
+}
diff --git a/src/main/java/ru/spbau/mit/FileDescriptor.java b/src/main/java/ru/spbau/mit/FileDescriptor.java
new file mode 100644
index 0000000..758bed0
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/FileDescriptor.java
@@ -0,0 +1,112 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Created by Анастасия on 10.04.2016.
+ */
+public final class FileDescriptor {
+ public static final int PART_SIZE = 1024 * 1024 * 10; //Size of one part of the file - 10M
+
+ private boolean hasFileGotId;
+ private int fileId;
+ private long fileSize;
+ private String fileName;
+
+ //Constructor for our class with id
+ public FileDescriptor(int id, long size, String name) {
+ fileId = id;
+ hasFileGotId = true;
+ fileSize = size;
+ fileName = name;
+ }
+
+ //Constructor for our class without id
+ public FileDescriptor(long size, String name) {
+ hasFileGotId = false;
+ fileSize = size;
+ fileName = name;
+ }
+
+ public boolean hasFileGotId() {
+ return hasFileGotId;
+ }
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setId(int id) {
+ fileId = id;
+ hasFileGotId = true;
+ }
+
+ public int getNumberOfTheParts() {
+ /*
+ * We need to add PART_SIZE to fileSize because our last part can be less than 10M
+ * and we need to subtract 1 because our last part can be 10M
+ */
+ return (int) (fileSize + PART_SIZE - 1) / PART_SIZE;
+ }
+
+ public int getPartSize(int partId) {
+ /*
+ * If we want to get size of the any part except last part, we return 10M
+ * or if PART_SIZE is divisor of fileSize, we also return 10M
+ */
+ if ((partId < getNumberOfTheParts() - 1) || (fileSize % PART_SIZE == 0)) {
+ return PART_SIZE;
+ }
+ return (int) fileSize % PART_SIZE;
+ }
+
+ public void writeInfoToOutputStream(DataOutputStream outputStream) throws IOException {
+ //If the file has id, we'll write it
+ if (hasFileGotId) {
+ outputStream.writeInt(fileId);
+ }
+ //Write size of file and it's name
+ outputStream.writeLong(fileSize);
+ outputStream.writeUTF(fileName);
+ }
+
+ public static FileDescriptor readInfoFromInputStream(DataInputStream inputStream, boolean hasFileGotId)
+ throws IOException {
+ //We create FileDescriptor object in two different ways, depends on has our file got id or not
+ if (hasFileGotId) {
+ int fileId = inputStream.readInt();
+ long fileSize = inputStream.readLong();
+ String fileName = inputStream.readUTF();
+ return new FileDescriptor(fileId, fileSize, fileName);
+ } else {
+ long fileSize = inputStream.readLong();
+ String fileName = inputStream.readUTF();
+ return new FileDescriptor(fileSize, fileName);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof FileDescriptor)) {
+ return false;
+ }
+ FileDescriptor that = (FileDescriptor) obj;
+ return this.hasFileGotId == that.hasFileGotId
+ && this.fileId == that.fileId
+ && Objects.equals(this.fileName, that.fileName)
+ && this.fileSize == that.fileSize;
+ }
+
+
+}
diff --git a/src/main/java/ru/spbau/mit/MyLock.java b/src/main/java/ru/spbau/mit/MyLock.java
new file mode 100644
index 0000000..690c371
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/MyLock.java
@@ -0,0 +1,42 @@
+package ru.spbau.mit;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Created by Анастасия on 11.04.2016.
+ */
+
+//Wrapper over class Lock
+public final class MyLock implements AutoCloseable {
+ private Lock lock;
+ private static final Queue locks = new ArrayDeque<>();
+
+ private MyLock() {};
+
+ //Create our own function lock returns MyLock
+ public static MyLock lock(Lock lock) {
+ MyLock res;
+ synchronized (locks) {
+ res = locks.poll(); //Set the head of the locks queue as a result (or null if the queue is empty)
+ }
+
+ //If the result is null, we'll create new MyLock
+ if (res == null) {
+ res = new MyLock();
+ }
+ res.lock = lock;
+ //Acquires the lock
+ lock.lock();
+ return res;
+ }
+
+ @Override
+ public void close() throws Exception {
+ lock.unlock();
+ synchronized (locks) {
+ locks.add(this); //When we release the lock, we add current MyLock object to the locks queue
+ }
+ }
+}
diff --git a/src/main/java/ru/spbau/mit/PartsBitset.java b/src/main/java/ru/spbau/mit/PartsBitset.java
new file mode 100644
index 0000000..283396c
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/PartsBitset.java
@@ -0,0 +1,79 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Created by Анастасия on 11.04.2016.
+ */
+public class PartsBitset {
+ private int cnt = 0;
+ private boolean[] flags;
+
+ public PartsBitset(int size, boolean defaultValue) {
+ flags = new boolean[size];
+ if (defaultValue) {
+ Arrays.fill(flags, true);
+ cnt = size;
+ }
+ }
+
+ public boolean get(int pos) {
+ return flags[pos];
+ }
+
+ public void set(int pos, boolean value) {
+ if (flags[pos] == value) {
+ return;
+ }
+ flags[pos] = !flags[pos];
+ if (flags[pos]) {
+ cnt++;
+ } else {
+ cnt--;
+ }
+ }
+
+ public int getCount() {
+ return cnt;
+ }
+
+ public void writeDataToOutputStream(DataOutputStream outputStream) throws IOException {
+ outputStream.writeInt(cnt);
+ for (int i = 0; i != flags.length; i++) {
+ if (get(i)) {
+ outputStream.writeInt(i);
+ }
+ }
+ }
+
+ public static PartsBitset readDataFromInputStream(DataInputStream inputStream, int size) throws IOException {
+ PartsBitset result = new PartsBitset(size, false);
+ int count = inputStream.readInt();
+ while (count > 0) {
+ count--;
+ result.set(inputStream.readInt(), true);
+ }
+ return result;
+ }
+
+ public void subtract(PartsBitset other) {
+ assert (other.flags.length == flags.length);
+ for (int i = 0; i != flags.length; i++) {
+ if (other.get(i)) {
+ set(i, false);
+ }
+ }
+ }
+
+ public int getFirstBitAtLeast(int pos) {
+ for (int i = pos; i != flags.length; i++) {
+ if (get(i)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+}
diff --git a/src/main/java/ru/spbau/mit/PeerToPeerConnection.java b/src/main/java/ru/spbau/mit/PeerToPeerConnection.java
new file mode 100644
index 0000000..31d8a8a
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/PeerToPeerConnection.java
@@ -0,0 +1,87 @@
+package ru.spbau.mit;
+
+import java.io.*;
+import java.net.Socket;
+
+/**
+ * Created by Анастасия on 11.04.2016.
+ */
+public class PeerToPeerConnection extends Connection {
+ public static final int STAT_REQUEST = 1;
+ public static final int GET_REQUEST = 2;
+ private static final int BUFFER_SIZE = 4096;
+
+ protected PeerToPeerConnection(Socket socket) throws IOException {
+ super(socket);
+ }
+
+ //Methods related to stat() function
+
+ public void sendStatRequest(int fileId) throws IOException {
+ DataOutputStream outputStream = getOutputStream();
+ outputStream.writeByte(STAT_REQUEST);
+ outputStream.writeInt(fileId);
+ outputStream.flush();
+ }
+
+ public int readStatRequest() throws IOException {
+ return getInputStream().readInt();
+ }
+
+ public void writeStatResponse(PartsBitset parts) throws IOException {
+ DataOutputStream outputStream = getOutputStream();
+ parts.writeDataToOutputStream(outputStream);
+ outputStream.flush();
+ }
+
+ public PartsBitset readStatResponse(int size) throws IOException {
+ return PartsBitset.readDataFromInputStream(getInputStream(), size);
+ }
+
+ //Methods related to get() function
+
+ public void sendGetRequest(Request request) throws IOException {
+ DataOutputStream outputStream = getOutputStream();
+ outputStream.writeByte(GET_REQUEST);
+ request.writeRequestToOutputStream(outputStream);
+ outputStream.flush();
+ }
+
+ public Request readGetRequest() throws IOException {
+ return Request.readRequestFromInputStream(getInputStream());
+ }
+
+ public void writeGetResponse(RandomAccessFile from, int partId, FileDescriptor descriptor) throws IOException {
+ from.seek(FileDescriptor.PART_SIZE * partId);
+ int amount = descriptor.getPartSize(partId);
+
+ DataOutputStream outputStream = getOutputStream();
+ byte[] buffer = new byte[BUFFER_SIZE];
+ while (amount > 0) {
+ int read = from.read(buffer, 0, Math.min(amount, BUFFER_SIZE));
+ if (read == -1) {
+ throw new EOFException("File is shorter than recorded size.");
+ }
+ amount -= read;
+ outputStream.write(buffer, 0, read);
+ }
+ outputStream.flush();
+ }
+
+ public void readGetResponse(RandomAccessFile to, int partId, FileDescriptor descriptor) throws IOException {
+ to.seek(FileDescriptor.PART_SIZE * partId);
+ int amount = descriptor.getPartSize(partId);
+
+ DataInputStream dis = getInputStream();
+ byte[] buffer = new byte[BUFFER_SIZE];
+ while (amount > 0) {
+ int read = dis.read(buffer, 0, Math.min(amount, BUFFER_SIZE));
+ if (read == -1) {
+ throw new EOFException("Cannot read the end of the file from socket.");
+ }
+ amount -= read;
+ to.write(buffer, 0, read);
+ }
+ }
+
+}
diff --git a/src/main/java/ru/spbau/mit/ReadWriteHelper.java b/src/main/java/ru/spbau/mit/ReadWriteHelper.java
new file mode 100644
index 0000000..a49d491
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/ReadWriteHelper.java
@@ -0,0 +1,68 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Writer;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+/**
+ * Created by Анастасия on 10.04.2016.
+ */
+
+/**
+ * This class contains functions that help us to read and write to/from output/input stream address and collections
+ */
+public abstract class ReadWriteHelper {
+ private static final int IP_LENGTH = 4; //We need it to read IP from the DataInputStream while reading address
+
+ public static void writeAddress(DataOutputStream outputStream, InetSocketAddress address) throws IOException {
+ outputStream.write(address.getAddress().getAddress()); //Write IP address to the output stream
+ outputStream.writeShort(address.getPort()); //Write port number to the output stream
+ }
+
+ public static InetSocketAddress readAddress(DataInputStream inputStream) throws IOException {
+ //Read IP address
+ byte[] buffer = new byte[IP_LENGTH];
+ for (int i = 0; i < IP_LENGTH; i++) {
+ buffer[i] = inputStream.readByte();
+ }
+ //Read port number
+ int port = inputStream.readUnsignedShort();
+ //Create InetSocketAddress object using InetSocketAddress(InetAddress, int) constructor
+ return new InetSocketAddress(InetAddress.getByAddress(buffer), port);
+ }
+
+ //Next interfaces are needed in reading/writing collections with parameter T
+ public interface Writer {
+ void write(DataOutputStream outputStream, T value) throws IOException;
+ }
+
+ public interface Reader {
+ T read(DataInputStream inputStream) throws IOException;
+ }
+
+ public static void writeCollection(DataOutputStream outputStream, Collection collection,
+ Writer super T> writer) throws IOException {
+ //Write size of collection
+ outputStream.writeInt(collection.size());
+ //Write all elements of collection
+ for (T element : collection) {
+ writer.write(outputStream, element);
+ }
+ }
+
+ public static > R readCollection(DataInputStream inputStream, R collection,
+ Reader extends T> reader) throws IOException {
+ //Read size of collection
+ int collectionSize = inputStream.readInt();
+ //Read all elements and add to collection
+ for (int i = 0; i < collectionSize; i++) {
+ collection.add(reader.read(inputStream));
+ }
+ return collection;
+ }
+
+}
diff --git a/src/main/java/ru/spbau/mit/Request.java b/src/main/java/ru/spbau/mit/Request.java
new file mode 100644
index 0000000..17ec78a
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/Request.java
@@ -0,0 +1,38 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Created by Анастасия on 10.04.2016.
+ */
+public class Request {
+ private int fileId;
+ private int partId;
+
+ public Request(int fileId, int partId) {
+ this.fileId = fileId;
+ this.partId = partId;
+ }
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public int getPartId() {
+ return partId;
+ }
+
+ public void writeRequestToOutputStream(DataOutputStream outputStream) throws IOException {
+ outputStream.writeInt(fileId);
+ outputStream.writeInt(partId);
+ }
+
+ public static Request readRequestFromInputStream(DataInputStream inputStream) throws IOException {
+ return new Request(
+ inputStream.readInt(), //Read fileId
+ inputStream.readInt() //Read partId
+ );
+ }
+}
diff --git a/src/main/java/ru/spbau/mit/Tracker.java b/src/main/java/ru/spbau/mit/Tracker.java
new file mode 100644
index 0000000..ae2c044
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/Tracker.java
@@ -0,0 +1,183 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * Created by Анастасия on 11.04.2016.
+ */
+public class Tracker implements AutoCloseable {
+ private static final String STATE_FILE = "tracker-state.dat";
+
+ private Path workingDirectory;
+ private ExecutorService threadPool;
+ private ScheduledExecutorService scheduler;
+ private ServerSocket serverSocket;
+ private List files;
+ private Map> seeders;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public Tracker(Path workingDirectory) throws Exception {
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ this.workingDirectory = workingDirectory;
+ serverSocket = new ServerSocket(TrackerConnection.PORT);
+ threadPool = Executors.newCachedThreadPool();
+ scheduler = Executors.newScheduledThreadPool(1);
+ load();
+ }
+ threadPool.submit(this::work);
+ }
+
+ @Override
+ public void close() throws Exception {
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ serverSocket.close();
+ }
+ threadPool.shutdown();
+ scheduler.shutdown();
+ store();
+ }
+
+ private void work() {
+ while (true) {
+ try {
+ Socket socket = serverSocket.accept();
+ if (socket == null) {
+ return;
+ }
+ //We listen the connection for a request byte
+ threadPool.submit(() -> listenConnection(socket));
+ } catch (IOException e) {
+ e.printStackTrace();
+ break;
+ }
+ }
+ }
+
+ private void listenConnection(Socket socket) {
+ try (TrackerConnection connection = new TrackerConnection(socket)) {
+ //We read request byte and choose right function
+ int request = connection.readRequest();
+ switch (request) {
+ case TrackerConnection.LIST_REQUEST:
+ list(connection);
+ break;
+ case TrackerConnection.UPLOAD_REQUEST:
+ upload(connection);
+ break;
+ case TrackerConnection.SOURCES_REQUEST:
+ sources(connection);
+ break;
+ case TrackerConnection.UPDATE_REQUEST:
+ update(connection);
+ break;
+ default:
+ System.err.printf("Request: %d is not correct!\n", request);
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void load() throws IOException {
+ Path path = workingDirectory.resolve(STATE_FILE);
+ if (Files.exists(path)) {
+ try (DataInputStream inputStream = new DataInputStream(Files.newInputStream(path))) {
+ files = ReadWriteHelper.readCollection(inputStream, new ArrayList<>(),
+ inputStream1 -> FileDescriptor.readInfoFromInputStream(inputStream1, true));
+ }
+ } else {
+ files = new ArrayList<>();
+ }
+ seeders = new HashMap<>();
+ }
+
+ private void store() throws IOException {
+ Path path = workingDirectory.resolve(STATE_FILE);
+ if (!Files.exists(path)) {
+ Files.createDirectories(workingDirectory);
+ Files.createFile(path);
+ }
+ try (DataOutputStream outputStream = new DataOutputStream(Files.newOutputStream(path))) {
+ ReadWriteHelper.writeCollection(outputStream, files,
+ (outputStream1, w) -> w.writeInfoToOutputStream(outputStream1));
+ }
+ }
+
+ private void list(TrackerConnection connection) throws Exception {
+ try (MyLock myLock = MyLock.lock(lock.readLock())) {
+ connection.writeListResponse(files);
+ }
+ }
+
+ private void upload(TrackerConnection connection) throws Exception {
+ FileDescriptor fileDescriptor = connection.readUploadRequest();
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ int newId = files.size();
+ fileDescriptor.setId(newId);
+ files.add(fileDescriptor);
+ }
+ connection.writeUploadResponse(fileDescriptor.getFileId());
+ }
+
+ private void sources(TrackerConnection connection) throws Exception {
+ List request = connection.readSourcesRequest();
+ List result;
+ try (MyLock myLock = MyLock.lock(lock.readLock())) {
+ result = request.stream()
+ .flatMap(i -> seeders
+ .getOrDefault(i, Collections.emptySet())
+ .stream()
+ )
+ .distinct()
+ .map(ClientDescriptor::getAddress)
+ .collect(Collectors.toList());
+ }
+ connection.writeSourcesResponse(result);
+ }
+
+ private void update(TrackerConnection connection) throws Exception {
+ ClientDescriptor receivedClientDescriptor = connection.readUpdateRequest();
+ ClientDescriptor clientDescriptor = new ClientDescriptor(
+ new InetSocketAddress(connection.getHost(), receivedClientDescriptor.getAddress().getPort()),
+ receivedClientDescriptor.getIdList()
+ );
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ for (int id : clientDescriptor.getIdList()) {
+ if (seeders.get(id) == null) {
+ seeders.put(id, new HashSet<>());
+ }
+ seeders.get(id).add(clientDescriptor);
+ }
+ }
+
+ scheduler.schedule(() -> {
+ try (MyLock myLock = MyLock.lock(lock.writeLock())) {
+ for (int id : clientDescriptor.getIdList()) {
+ seeders.get(id).remove(clientDescriptor);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }, TrackerConnection.UPDATE_DELAY, TimeUnit.MILLISECONDS);
+
+ connection.writeUpdateResponse(true);
+ }
+
+
+}
diff --git a/src/main/java/ru/spbau/mit/TrackerConnection.java b/src/main/java/ru/spbau/mit/TrackerConnection.java
new file mode 100644
index 0000000..4d4b834
--- /dev/null
+++ b/src/main/java/ru/spbau/mit/TrackerConnection.java
@@ -0,0 +1,128 @@
+package ru.spbau.mit;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Created by Анастасия on 11.04.2016.
+ */
+public class TrackerConnection extends Connection {
+
+ public static final int PORT = 8081;
+ public static final int UPDATE_DELAY = 60 * 1000;
+
+ public static final int LIST_REQUEST = 1;
+ public static final int UPLOAD_REQUEST = 2;
+ public static final int SOURCES_REQUEST = 3;
+ public static final int UPDATE_REQUEST = 4;
+
+ //Constructor for our class same as constructor for Connection class
+ protected TrackerConnection(Socket socket) throws IOException {
+ super(socket);
+ }
+
+ //Methods related to list() function
+
+ public void sendListRequest() throws IOException {
+ DataOutputStream outputStream = getOutputStream();
+ outputStream.writeByte(LIST_REQUEST);
+ outputStream.flush();
+ }
+
+ public void writeListResponse(Collection files) throws IOException {
+ writeCollection(files, (outputStream, fileDescriptor) -> {
+ if (!fileDescriptor.hasFileGotId()) {
+ throw new IllegalStateException("Uploaded files must have id.");
+ }
+ fileDescriptor.writeInfoToOutputStream(outputStream);
+ });
+ getOutputStream().flush();
+ }
+
+ public List readListResponse() throws IOException {
+ return readCollection(new ArrayList<>(),
+ (inputStream) -> FileDescriptor.readInfoFromInputStream(inputStream, true));
+ }
+
+ //Methods related to upload() function
+
+ public void sendUploadRequest(FileDescriptor file) throws IOException {
+ if (file.hasFileGotId()) {
+ throw new IllegalStateException("File, we want to upload, can't have id!");
+ }
+ DataOutputStream outputStream = getOutputStream();
+ outputStream.writeByte(UPLOAD_REQUEST);
+ file.writeInfoToOutputStream(outputStream);
+ outputStream.flush();
+ }
+
+ public FileDescriptor readUploadRequest() throws IOException {
+ return FileDescriptor.readInfoFromInputStream(getInputStream(), false);
+ }
+
+ public void writeUploadResponse(int fileId) throws IOException {
+ DataOutputStream outputStream = getOutputStream();
+ outputStream.writeInt(fileId);
+ outputStream.flush();
+ }
+
+ public int readUploadResponse() throws IOException {
+ return getInputStream().readInt();
+ }
+
+ //Methods related to sources() function
+
+ public void sendSourcesRequest(Collection idList) throws IOException {
+ if (idList.size() == 0) {
+ throw new IllegalStateException("Error! idList is empty!");
+ }
+ DataOutputStream outputStream = getOutputStream();
+ outputStream.writeByte(SOURCES_REQUEST);
+ writeCollection(idList, DataOutputStream::writeInt);
+ outputStream.flush();
+ }
+
+ public List readSourcesRequest() throws IOException {
+ return readCollection(new ArrayList<>(), DataInputStream::readInt);
+ }
+
+ public void writeSourcesResponse(Collection addresses) throws IOException {
+ writeCollection(addresses, ReadWriteHelper::writeAddress);
+ }
+
+ public List readSourcesResponse() throws IOException {
+ return readCollection(new ArrayList<>(), ReadWriteHelper::readAddress);
+ }
+
+ //Methods related to update() function
+
+ public void sendUpdateRequest(ClientDescriptor clientDescriptor) throws IOException {
+ if (clientDescriptor.getIdList().size() == 0) {
+ throw new IllegalStateException("Error! idList is empty!");
+ }
+ DataOutputStream outputStream = getOutputStream();
+ outputStream.writeByte(UPDATE_REQUEST);
+ clientDescriptor.writeInfoToOutputStream(outputStream);
+ outputStream.flush();
+ }
+
+ public ClientDescriptor readUpdateRequest() throws IOException {
+ return ClientDescriptor.readInfoFromInputStream(getInputStream());
+ }
+
+ public void writeUpdateResponse(boolean isSuccessful) throws IOException {
+ DataOutputStream outputStream = getOutputStream();
+ outputStream.writeBoolean(isSuccessful);
+ outputStream.flush();
+ }
+
+ public boolean readUpdateResponse() throws IOException {
+ return getInputStream().readBoolean();
+ }
+}
diff --git a/src/test/client-01 b/src/test/client-01
new file mode 100644
index 0000000..2a11f8b
--- /dev/null
+++ b/src/test/client-01
@@ -0,0 +1 @@
+client
\ No newline at end of file
diff --git a/src/test/client-02 b/src/test/client-02
new file mode 100644
index 0000000..b051c6c
--- /dev/null
+++ b/src/test/client-02
@@ -0,0 +1 @@
+client
diff --git a/src/test/client-03 b/src/test/client-03
new file mode 100644
index 0000000..b051c6c
--- /dev/null
+++ b/src/test/client-03
@@ -0,0 +1 @@
+client
diff --git a/src/test/java/ru/spbau/mit/Tests.java b/src/test/java/ru/spbau/mit/Tests.java
new file mode 100644
index 0000000..e021ce7
--- /dev/null
+++ b/src/test/java/ru/spbau/mit/Tests.java
@@ -0,0 +1,189 @@
+package ru.spbau.mit;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.*;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by Анастасия on 11.04.2016.
+ */
+public class Tests {
+ private static final Path EXAMPLE_PATH = Paths.get("src", "test", "mainFile");
+ private static final Path TRACKER_DIR = Paths.get("test", "tracker");
+ private static final Path CLIENT1_DIR = Paths.get("test", "client-01");
+ private static final Path CLIENT2_DIR = Paths.get("test", "client-02");
+ private static final Path CLIENT3_DIR = Paths.get("test", "client-03");
+ private static final long TIME_LIMIT = 70 * 1000L;
+
+ @Test
+ public void listAndUploadTest() throws Throwable {
+ try (
+ Tracker tracker = new Tracker(TRACKER_DIR);
+ Client client1 = new Client("localhost", CLIENT1_DIR);
+ Client client2 = new Client("localhost", CLIENT2_DIR)
+ ) {
+ assertAllCollectionEquals(Collections.emptyList(), client1.list(), client2.list());
+
+ FileDescriptor descriptor1 = client1.newFile(EXAMPLE_PATH);
+ FileDescriptor descriptor2 = client2.newFile(EXAMPLE_PATH);
+ assertNotEquals("Ids should be different", descriptor1.getFileId(), descriptor2.getFileId());
+
+ assertAllCollectionEquals(Arrays.asList(descriptor1, descriptor2), client1.list(), client2.list());
+ }
+ }
+
+ @Test
+ public void listConsistencyTest() throws Throwable {
+ try (Client client = new Client("localhost", CLIENT1_DIR)) {
+ FileDescriptor descriptor;
+ try (Tracker tracker = new Tracker(TRACKER_DIR)) {
+ descriptor = client.newFile(EXAMPLE_PATH);
+ }
+
+ List list;
+ try (Tracker tracker = new Tracker(TRACKER_DIR)) {
+ list = client.list();
+ }
+ assertEquals(Collections.singletonList(descriptor), list);
+ }
+ }
+
+ @Test(timeout = TIME_LIMIT)
+ public void testDownload() throws Throwable {
+ final DownloadWaiter waiter2 = new DownloadWaiter();
+ final DownloadWaiter waiter3 = new DownloadWaiter();
+ FileDescriptor descriptor;
+ try (
+ Tracker tracker = new Tracker(TRACKER_DIR);
+ Client client2 = new Client("localhost", CLIENT2_DIR)
+ ) {
+ client2.setCallbacks(waiter2);
+ try (Client client1 = new Client("localhost", CLIENT1_DIR)) {
+ descriptor = client1.newFile(EXAMPLE_PATH);
+ assertTrue(client2.get(descriptor.getFileId()));
+
+ // Seeding
+ client1.run();
+ // Leeching
+ client2.run();
+
+ synchronized (waiter2) {
+ while (!waiter2.ready) {
+ waiter2.wait();
+ }
+ }
+ }
+
+ //Testing that now client2 seeding
+ try (Client client3 = new Client("localhost", CLIENT3_DIR)) {
+ client3.setCallbacks(waiter3);
+ assertTrue(client3.get(descriptor.getFileId()));
+
+ //And leeching
+ client3.run();
+ synchronized (waiter3) {
+ while (!waiter3.ready) {
+ waiter3.wait();
+ }
+ }
+ }
+ }
+
+ Path downloadedPath = Paths.get(
+ "downloads",
+ Integer.toString(descriptor.getFileId()),
+ EXAMPLE_PATH.getFileName().toString()
+ );
+ assertTrue("Downloaded file is different!", FileUtils.contentEquals(
+ EXAMPLE_PATH.toFile(),
+ CLIENT2_DIR.resolve(downloadedPath).toFile()
+ ));
+ assertTrue("Downloaded file is different!", FileUtils.contentEquals(
+ EXAMPLE_PATH.toFile(),
+ CLIENT3_DIR.resolve(downloadedPath).toFile()
+ ));
+ }
+
+ @Before
+ @After
+ public void clear() throws IOException {
+ clearDirectory("test");
+ }
+
+ private void clearDirectory(String name) throws IOException {
+ Path path = Paths.get(name);
+ if (!Files.exists(path)) {
+ return;
+ }
+
+ Files.walkFileTree(path, new Deleter());
+ }
+
+ private void assertAllCollectionEquals(Collection> expected, Collection>... others) {
+ for (Collection> other : others) {
+ assertEquals(expected, other);
+ }
+ }
+
+ private static class Deleter extends SimpleFileVisitor {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+ return super.visitFile(file, attrs);
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+ return super.postVisitDirectory(dir, exc);
+ }
+ }
+
+ private static final class DownloadWaiter implements Client.StatusCallbacks {
+ private boolean ready = false;
+
+ private DownloadWaiter() {
+ }
+
+ @Override
+ public void onTrackerUpdated(boolean result, Throwable e) {
+ }
+
+ @Override
+ public void onDownloadIssue(FileDescriptor descriptor, String message, Throwable e) {
+ }
+
+ @Override
+ public void onDownloadStart(FileDescriptor descriptor) {
+ }
+
+ @Override
+ public void onDownloadPart(FileDescriptor descriptor, int partId) {
+ }
+
+ @Override
+ public void onDownloadComplete(FileDescriptor descriptor) {
+ synchronized (this) {
+ ready = true;
+ notify();
+ }
+ }
+
+ @Override
+ public void onPeerToPeerServerIssue(Throwable e) {
+ }
+ }
+}
diff --git a/src/test/mainFile b/src/test/mainFile
new file mode 100644
index 0000000..ba2906d
--- /dev/null
+++ b/src/test/mainFile
@@ -0,0 +1 @@
+main
diff --git a/src/test/tracker b/src/test/tracker
new file mode 100644
index 0000000..05a682b
--- /dev/null
+++ b/src/test/tracker
@@ -0,0 +1 @@
+Hello!
\ No newline at end of file