From 1e89abff1548080c3d13a0e8d4468edd87336a12 Mon Sep 17 00:00:00 2001 From: Kresten Date: Tue, 26 May 2015 16:48:08 +0200 Subject: [PATCH 01/18] i done stuff plz halp test --- src/ChordNameService.java | 66 ++++++++++++ src/ChordNameServiceImpl.java | 192 +++++++++++++++++++++++++++++++++ src/DistributedTextEditor.java | 123 +++++++++++---------- src/JoinEvent.java | 23 ++++ src/JupiterSynchronizer.java | 2 - src/Transformer.java | 4 + 6 files changed, 346 insertions(+), 64 deletions(-) create mode 100644 src/ChordNameService.java create mode 100644 src/ChordNameServiceImpl.java create mode 100644 src/JoinEvent.java diff --git a/src/ChordNameService.java b/src/ChordNameService.java new file mode 100644 index 0000000..14605d9 --- /dev/null +++ b/src/ChordNameService.java @@ -0,0 +1,66 @@ +import java.net.InetSocketAddress; + +/** + * + * Interface for the Chord naming service. Each peer is named by an IP + * address and a port, technically an InetSocketAddress. Each + * InetSocketAddress is mapped into a key, an unsigned 31-bit integer, + * by taking hash=InetSocketAddress.hashCode() and letting key = + * abs(hash*1073741651 % 2147483647), where abs() is absolute value. + * This key is used to arrange all InetSocketAddress's into a ring + * with the current peers being responsible for each their interval of + * the key space, according to the Chord network topology. The + * interface allows to enter and leave a chord group and allows to + * find the name of a peer currently responsible for a given key. + */ + + +public interface ChordNameService extends Runnable { + + /** + * Compute the key of a given name. Returns a positive 31-bit + * integer, hased as to be "random looking" even for similar + * names. + */ + public int keyOfName(InetSocketAddress name); + + /** + * Used by the first group member. Specifies the port on + * which this founding peer is listening for new peers to join + * or leave. The name of the foudning peer is its local IP + * address and the given port. Its key is derived from the + * name using the method described above. + */ + public void createGroup(); + + /** + * Used to join a Chord group. This takes place by contacting + * one of the existing peers of the Chord group. The new peer + * has the name specified by the local IP address and the + * given port. The key of the new peer is derived from its + * name using the method described above. + * + * @param knownPeer The IP address and port of the known peer. + */ + public void joinGroup(InetSocketAddress knownPeer); + + /** + * Returns the name of this peer. May only be called after a + * group has been formed or joined. + */ + public InetSocketAddress getChordName(); + + /** + * Makes this instance of ChordNameService leave the peer + * group. The other peers should be informed of this and the + * Chord network updated appropriately. + */ + public void leaveGroup(); + + /** + * Starts the thread which manages this peers participation in + * the Chord network. + */ + public void run(); + +} \ No newline at end of file diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java new file mode 100644 index 0000000..3a12c7a --- /dev/null +++ b/src/ChordNameServiceImpl.java @@ -0,0 +1,192 @@ +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; + +public class ChordNameServiceImpl extends Thread implements ChordNameService { + + private DistributedTextEditor dte; + private boolean joining; + private int port; + protected InetSocketAddress myName; + protected int myKey; + private InetSocketAddress suc; + private InetSocketAddress pre; + private InetSocketAddress connectedAt; + private ServerSocket serverSocket; + private Socket joiningSocket; + private Socket preSocket; + private Socket nextSocket; + private boolean active; + + public ChordNameServiceImpl(InetSocketAddress myName, DistributedTextEditor dte){ + this.myName = myName; + this.port = myName.getPort(); + this.dte = dte; + } + + public int keyOfName(InetSocketAddress name) { + int tmp = name.hashCode()*1073741651 % 2147483647; + if (tmp < 0) { tmp = -tmp; } + return tmp; + } + + public InetSocketAddress getChordName() { + return myName; + } + + public void createGroup() { + joining = false; + active = true; + myKey = keyOfName(myName); + start(); + this.suc = getChordName(); + this.pre = getChordName(); + } + + public void joinGroup(InetSocketAddress knownPeer) { + joining = true; + active = true; + this.port = knownPeer.getPort(); + connectedAt = knownPeer; + myKey = keyOfName(myName); + this.pre = knownPeer; + start(); + } + + public void leaveGroup() { + active = false; + } + + public void run() { + System.out.println("My name is " + myName + " and my key is " + myKey); + if(joining){ + //Create socket for connection with a listening DistributedTextEditor + try{ + preSocket = new Socket(connectedAt.getAddress(), connectedAt.getPort()); + } catch (IOException e1) { + e1.printStackTrace(); + } + squeezeIn(preSocket); + dte.setTitle("Connected to " + connectedAt); + dte.newEventPlayer(nextSocket, myKey); + dte.newEventReplayer(preSocket, myKey); + } + + dte.setTitle("I'm listening on " + myName.getAddress()+":"+myName.getPort()); + try { + serverSocket = new ServerSocket(port); + } catch (IOException e) { + e.printStackTrace(); + } + while(active) { + //Create server socket and listen until another DistributedTextEditor connects + try { + joiningSocket = serverSocket.accept(); + acceptNode(joiningSocket); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + + /* + * If joining we should now enter the existing group and + * should at some point register this peer on its port if not + * already done and start listening for incoming connection + * from other peers who want to enter or leave the + * group. After leaveGroup() was called, the run() method + * should return so that the thread running it might + * terminate. + */ + } + + public void squeezeIn(Socket preSocket){ + JoinEvent je = null; + try { + ObjectOutputStream out = new ObjectOutputStream(preSocket.getOutputStream()); + je = new JoinEvent(myName, Role.NEXT); + out.writeObject(je); + ObjectInputStream in = new ObjectInputStream(preSocket.getInputStream()); + while ((je = (JoinEvent) in.readObject()) != null) { + //First time around, the listener returns himself as his own successor + if(je.getName().equals(connectedAt)){ + suc = pre; + nextSocket = preSocket; + } + else if(je.getRole().equals(Role.NEXT)){ + suc=je.getName(); + try{ + nextSocket = new Socket(suc.getAddress(), suc.getPort()); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + out.close(); + in.close(); + break; + } + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + } + + public void acceptNode(Socket joiningSocket){ + JoinEvent je = null; + JoinEvent sucje = null; + JoinEvent joiningje = null; + InetSocketAddress newGuy = null; + try { + ObjectInputStream in = new ObjectInputStream(joiningSocket.getInputStream()); + ObjectOutputStream joiningOut = new ObjectOutputStream(joiningSocket.getOutputStream()); + while ((je = (JoinEvent) in.readObject()) != null) { + if(je.getRole().equals(Role.NEXT)){ + newGuy = je.getName(); + //first time listener has himself as suc and pre + if(pre.equals(suc)){ + //send self to joining node + joiningje = new JoinEvent(suc, Role.NEXT); + joiningOut.writeObject(joiningje); + //joining node is now pre and suc, EventPlayer and EventReplayer is spawned + preSocket = joiningSocket; + nextSocket = joiningSocket; + pre = newGuy; + suc = newGuy; + dte.newEventPlayer(nextSocket, myKey); + dte.newEventReplayer(preSocket, myKey); + } + else { + ObjectOutputStream sucOut = new ObjectOutputStream(nextSocket.getOutputStream()); + //send new node to own successor + sucje = new JoinEvent(newGuy, Role.PREVIOUS); + sucOut.writeObject(sucje); + //send own successor to joining node + joiningje = new JoinEvent(suc, Role.NEXT); + joiningOut.writeObject(joiningje); + //dræb eventplayer, andens EventReplayer, mÃ¥ske et DisconnectEvent??? + dte.disconnect(); + //VED IKKE MED DEN HER + while (!nextSocket.isClosed()) {} + suc = newGuy; + nextSocket = joiningSocket; + dte.newEventPlayer(nextSocket, myKey); + sucOut.close(); + joiningOut.close(); + } + in.close(); + } + else {//receive new node as new predecessor (je.getRole().equals(Role.PREVIOUS)) + pre = je.getName(); + preSocket.close(); + preSocket = new Socket(pre.getAddress(),pre.getPort()); + dte.newEventReplayer(preSocket, myKey); + } + break; + } + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + } + +} \ No newline at end of file diff --git a/src/DistributedTextEditor.java b/src/DistributedTextEditor.java index 9710264..9c75fa0 100644 --- a/src/DistributedTextEditor.java +++ b/src/DistributedTextEditor.java @@ -4,10 +4,7 @@ import java.io.*; import javax.swing.*; import javax.swing.text.*; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.UnknownHostException; +import java.net.*; import java.util.concurrent.CopyOnWriteArrayList; public class DistributedTextEditor extends JFrame { @@ -23,8 +20,6 @@ public class DistributedTextEditor extends JFrame { private boolean changed = false; private DocumentEventCapturer dec = new DocumentEventCapturer(); - private Socket socket; - private ServerSocket serverSocket; JupiterSynchronizer jupiterSynchronizer = new JupiterSynchronizer(); @@ -86,66 +81,67 @@ public void keyPressed(KeyEvent e) { } }; + /** + * Computes the name of this peer by resolving the local host name + * and adding the current portname. + */ + protected InetSocketAddress _getMyName() { + try { + InetAddress localhost = InetAddress.getLocalHost(); + InetSocketAddress name = new InetSocketAddress(localhost, Integer.parseInt(portNumber.getText())); + return name; + } catch (UnknownHostException e) { + System.err.println("Cannot resolve the Internet address of the local host."); + System.err.println(e); + } + return null; + } + Action Listen = new AbstractAction("Listen") { public void actionPerformed(ActionEvent e) { - //Prepare editor for connection - saveOld(); - area1.setText(""); - changed = false; - Save.setEnabled(false); - SaveAs.setEnabled(false); - - //Get own address for hosting - String address = null; - try { - address = InetAddress.getLocalHost().getHostAddress(); - } - catch (UnknownHostException e1) { - e1.printStackTrace(); - } - int port = Integer.parseInt(portNumber.getText()); - setTitle("I'm listening on "+address + ":" + port); - - //Create server socket and listen until another DistributedTextEditor connects - try { - serverSocket = new ServerSocket(port); - socket = serverSocket.accept(); - serverSocket.close(); - } catch (IOException e1) { - e1.printStackTrace(); - } - //Create threads for sending and receiving text - establishConnection(socket, dec); - setTitle("Connected to " + socket.getInetAddress().toString() + ":" + + socket.getPort()); + listen(); } }; + public void listen(){ + //Prepare editor for connection + saveOld(); + area1.setText(""); + changed = false; + Save.setEnabled(false); + SaveAs.setEnabled(false); + Connect.setEnabled(false); + InetSocketAddress name = _getMyName(); + + ChordNameService chordNameService = new ChordNameServiceImpl(name, this); + chordNameService.createGroup(); + } + Action Connect = new AbstractAction("Connect") { public void actionPerformed(ActionEvent e) { - //Prepare editor for connection - saveOld(); - area1.setText(""); - changed = false; - Save.setEnabled(false); - SaveAs.setEnabled(false); + connect(); + } + }; - //Connect with a listening DistributedTextEditor - String address = ipaddress.getText(); - int port = Integer.parseInt(portNumber.getText()); - setTitle("Connecting to " + address +":"+ port + "..."); + public void connect(){ + //Prepare editor for connection + saveOld(); + area1.setText(""); + changed = false; + Save.setEnabled(false); + SaveAs.setEnabled(false); + Listen.setEnabled(false); - //Create socket for connection with a listening DistributedTextEditor - try{ - socket = new Socket(address, port); - } catch (IOException e1) { - e1.printStackTrace(); - } - //Create threads for sending and receiving text - establishConnection(socket, dec); - setTitle("Connected to " + address + ":" + port); + //Connect with a listening DistributedTextEditor + String address = ipaddress.getText(); + int port = Integer.parseInt(portNumber.getText()); + InetSocketAddress knownPeer = new InetSocketAddress(address, port); - } - }; + InetSocketAddress name = _getMyName(); + + ChordNameService chordNameService = new ChordNameServiceImpl(name, this); + chordNameService.joinGroup(knownPeer); + } //Disconnect method for this DistributedTextEditor Action Disconnect = new AbstractAction("Disconnect") { @@ -224,17 +220,20 @@ private void saveFile(String fileName) { } } - private void establishConnection(Socket socket, DocumentEventCapturer dec) { - //give threads a number, so we know which was first (most important) - int id = (int) (100 * Math.random()); + public void newEventPlayer(Socket socket, int id){ + EventPlayer ep = new EventPlayer(socket, dec, this, id, jupiterSynchronizer); + Thread ept = new Thread(ep); + ept.start(); + } + public void newEventReplayer(Socket socket, int id){ EventReplayer er = new EventReplayer(socket, area1, this, jupiterSynchronizer); Thread ert = new Thread(er); ert.start(); + } - EventPlayer ep = new EventPlayer(socket, dec, this, id, jupiterSynchronizer); - Thread ept = new Thread(ep); - ept.start(); + public JTextArea getArea1(){ + return area1; } public static void main(String[] arg) { diff --git a/src/JoinEvent.java b/src/JoinEvent.java new file mode 100644 index 0000000..9e81d5d --- /dev/null +++ b/src/JoinEvent.java @@ -0,0 +1,23 @@ +import java.io.Serializable; +import java.net.InetSocketAddress; + +enum Role{ + PREVIOUS, NEXT +} +public class JoinEvent implements Serializable { + private final InetSocketAddress name; + private final Role role; + + public JoinEvent(InetSocketAddress name, Role role){ + this.name = name; + this.role = role; + } + + public InetSocketAddress getName() { + return name; + } + + public Role getRole() { + return role; + } +} diff --git a/src/JupiterSynchronizer.java b/src/JupiterSynchronizer.java index b5dd9a4..27dce70 100644 --- a/src/JupiterSynchronizer.java +++ b/src/JupiterSynchronizer.java @@ -39,8 +39,6 @@ public synchronized MyTextEvent receive(MyTextEvent mte){ //{msg, outgoing[i]} = xform(msg, outgoing[i]); MyTextEvent[] xformed = Transformer.xform(mte, iterator.next()); mte = xformed[0]; - xformed[1].setLocalTime(iterator.next().getLocalTime()); - xformed[1].setOtherTime(iterator.next().getOtherTime()); outgoing.set(i, xformed[1]); i++; } diff --git a/src/Transformer.java b/src/Transformer.java index 07be675..cbdb28a 100644 --- a/src/Transformer.java +++ b/src/Transformer.java @@ -6,6 +6,8 @@ public class Transformer { public static MyTextEvent[] xform(MyTextEvent received, MyTextEvent local) { pair[0] = received; pair[1] = local; + int localMyMsgs = local.getLocalTime(); + int localOtherMsgs = local.getOtherTime(); if(received instanceof TextInsertEvent && local instanceof TextInsertEvent){ TextInsertEvent receivedIns = (TextInsertEvent) received; TextInsertEvent localIns = (TextInsertEvent) local; @@ -26,6 +28,8 @@ else if(received instanceof TextRemoveEvent && local instanceof TextInsertEvent) TextInsertEvent localIns = (TextInsertEvent) local; pair = removeInsert(receivedRem, localIns); } + pair[1].setLocalTime(localMyMsgs); + pair[1].setOtherTime(localOtherMsgs); return pair; } From 2e9cad77aefbc8180c3983b3bd41e6885bb79467 Mon Sep 17 00:00:00 2001 From: Kresten Date: Tue, 26 May 2015 16:59:23 +0200 Subject: [PATCH 02/18] we try again --- src/ChordNameServiceImpl.java | 5 ----- src/JupiterSynchronizer.java | 6 +++++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 3a12c7a..9948218 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -123,8 +123,6 @@ else if(je.getRole().equals(Role.NEXT)){ e1.printStackTrace(); } } - out.close(); - in.close(); break; } } catch (IOException | ClassNotFoundException e) { @@ -171,10 +169,7 @@ public void acceptNode(Socket joiningSocket){ suc = newGuy; nextSocket = joiningSocket; dte.newEventPlayer(nextSocket, myKey); - sucOut.close(); - joiningOut.close(); } - in.close(); } else {//receive new node as new predecessor (je.getRole().equals(Role.PREVIOUS)) pre = je.getName(); diff --git a/src/JupiterSynchronizer.java b/src/JupiterSynchronizer.java index 27dce70..9cda49c 100644 --- a/src/JupiterSynchronizer.java +++ b/src/JupiterSynchronizer.java @@ -3,7 +3,11 @@ import java.util.concurrent.CopyOnWriteArrayList; /** - * Created by Kresten on 12-05-2015. + * Based on the article + * High-Latency, Low-Bandwidth Windowing in the Jupiter Collaboration System + * David A. Nichols, Pavel Curtis, + * Michael Dixon, and John Lamping + * Xerox PARC */ public class JupiterSynchronizer { From 64ca95c2db3f88949feb1fbaef42d416e7e6ca03 Mon Sep 17 00:00:00 2001 From: Kresten Date: Tue, 26 May 2015 23:14:35 +0200 Subject: [PATCH 03/18] =?UTF-8?q?Har=20fjernet=20blokeringen=20i=20DEC,=20?= =?UTF-8?q?s=C3=A5=20nu=20looper=20et=20event=20i=20de=20forbundne=20DTE'e?= =?UTF-8?q?r?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ChordNameServiceImpl.java | 3 ++- src/DistributedTextEditor.java | 2 -- src/DocumentEventCapturer.java | 21 ++++----------------- 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 9948218..8592790 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -41,9 +41,9 @@ public void createGroup() { joining = false; active = true; myKey = keyOfName(myName); - start(); this.suc = getChordName(); this.pre = getChordName(); + start(); } public void joinGroup(InetSocketAddress knownPeer) { @@ -90,6 +90,7 @@ public void run() { e1.printStackTrace(); } } + //leaveGroup sætter active=false, så her skal lukkes sockets og gøres rent /* * If joining we should now enter the existing group and diff --git a/src/DistributedTextEditor.java b/src/DistributedTextEditor.java index 9c75fa0..f6ee100 100644 --- a/src/DistributedTextEditor.java +++ b/src/DistributedTextEditor.java @@ -5,7 +5,6 @@ import javax.swing.*; import javax.swing.text.*; import java.net.*; -import java.util.concurrent.CopyOnWriteArrayList; public class DistributedTextEditor extends JFrame { @@ -77,7 +76,6 @@ public void keyPressed(KeyEvent e) { changed = true; Save.setEnabled(true); SaveAs.setEnabled(true); - dec.toggleMakeEvents(true); } }; diff --git a/src/DocumentEventCapturer.java b/src/DocumentEventCapturer.java index c32eeee..d3901f7 100644 --- a/src/DocumentEventCapturer.java +++ b/src/DocumentEventCapturer.java @@ -25,7 +25,6 @@ public class DocumentEventCapturer extends DocumentFilter { * we want, as we then don't need to keep asking until there are new elements. */ protected LinkedBlockingQueue eventHistory = new LinkedBlockingQueue(); - private boolean makeEvents; /** * If the queue is empty, then the call will block until an element arrives. @@ -42,10 +41,7 @@ public void insertString(FilterBypass fb, int offset, throws BadLocationException { /* Queue a copy of the event and then modify the textarea */ - if(makeEvents) { - eventHistory.add(new TextInsertEvent(offset, str)); - toggleMakeEvents(false); - } + eventHistory.add(new TextInsertEvent(offset, str)); super.insertString(fb, offset, str, a); } @@ -53,10 +49,7 @@ public void insertString(FilterBypass fb, int offset, public void remove(FilterBypass fb, int offset, int length) throws BadLocationException { /* Queue a copy of the event and then modify the textarea */ - if(makeEvents) { - eventHistory.add(new TextRemoveEvent(offset, length)); - toggleMakeEvents(false); - } + eventHistory.add(new TextRemoveEvent(offset, length)); super.remove(fb, offset, length); } @@ -66,16 +59,10 @@ public void replace(FilterBypass fb, int offset, throws BadLocationException { /* Queue a copy of the event and then modify the text */ - if (makeEvents) { - if (length > 0) { - eventHistory.add(new TextRemoveEvent(offset, length)); - } + if (length > 0) { + eventHistory.add(new TextRemoveEvent(offset, length)); eventHistory.add(new TextInsertEvent(offset, str)); - toggleMakeEvents(false); } super.replace(fb, offset, length, str, a); } - public void toggleMakeEvents(Boolean bool){ - makeEvents = bool; - } } \ No newline at end of file From e0732795a90e537b1793e21c3e765d0f044e803f Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 00:24:33 +0200 Subject: [PATCH 04/18] =?UTF-8?q?Har=20kigget=20lidt=20i=20Chord.=20LeaveG?= =?UTF-8?q?roup=20fungerer=20ikke=20nu,=20men=20id=C3=A9en=20er=20der.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ChordNameServiceImpl.java | 74 +++++++++++++++++++++------------- src/DistributedTextEditor.java | 16 +++----- src/EventReplayer.java | 8 +--- src/JoinEvent.java | 2 +- 4 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 8592790..10568e3 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -18,7 +18,7 @@ public class ChordNameServiceImpl extends Thread implements ChordNameService { private ServerSocket serverSocket; private Socket joiningSocket; private Socket preSocket; - private Socket nextSocket; + private Socket sucSocket; private boolean active; public ChordNameServiceImpl(InetSocketAddress myName, DistributedTextEditor dte){ @@ -71,7 +71,7 @@ public void run() { } squeezeIn(preSocket); dte.setTitle("Connected to " + connectedAt); - dte.newEventPlayer(nextSocket, myKey); + dte.newEventPlayer(sucSocket, myKey); dte.newEventReplayer(preSocket, myKey); } @@ -85,11 +85,26 @@ public void run() { //Create server socket and listen until another DistributedTextEditor connects try { joiningSocket = serverSocket.accept(); - acceptNode(joiningSocket); + processNode(joiningSocket); } catch (IOException e1) { e1.printStackTrace(); } } + //we want to leave the chord + JoinEvent je = null; + try { + //Send own suc to pre, so it knows which to connect on + ObjectOutputStream preout = new ObjectOutputStream(preSocket.getOutputStream()); + je = new JoinEvent(suc, Role.ABORTSUCCESSOR); + preout.writeObject(je); + //We cut connection with own suc + dte.disconnect(); + //DETTE KUNNE VIRKE, hvis pre hele tiden så hvad der kommer ind i streamen, altså altid er i processNode. Det er den bare ikke :((( + + } catch (IOException e) { + e.printStackTrace(); + } + //leaveGroup sætter active=false, så her skal lukkes sockets og gøres rent /* @@ -107,22 +122,18 @@ public void squeezeIn(Socket preSocket){ JoinEvent je = null; try { ObjectOutputStream out = new ObjectOutputStream(preSocket.getOutputStream()); - je = new JoinEvent(myName, Role.NEXT); + je = new JoinEvent(myName, Role.SUCCESSOR); out.writeObject(je); ObjectInputStream in = new ObjectInputStream(preSocket.getInputStream()); while ((je = (JoinEvent) in.readObject()) != null) { //First time around, the listener returns himself as his own successor if(je.getName().equals(connectedAt)){ suc = pre; - nextSocket = preSocket; + sucSocket = preSocket; } - else if(je.getRole().equals(Role.NEXT)){ + else if(je.getRole().equals(Role.SUCCESSOR)){ suc=je.getName(); - try{ - nextSocket = new Socket(suc.getAddress(), suc.getPort()); - } catch (IOException e1) { - e1.printStackTrace(); - } + sucSocket = new Socket(suc.getAddress(), suc.getPort()); } break; } @@ -131,50 +142,59 @@ else if(je.getRole().equals(Role.NEXT)){ } } - public void acceptNode(Socket joiningSocket){ - JoinEvent je = null; - JoinEvent sucje = null; - JoinEvent joiningje = null; - InetSocketAddress newGuy = null; + public void processNode(Socket joiningSocket){ + JoinEvent je; + JoinEvent sucje; + JoinEvent joiningje; + InetSocketAddress newGuy; try { ObjectInputStream in = new ObjectInputStream(joiningSocket.getInputStream()); ObjectOutputStream joiningOut = new ObjectOutputStream(joiningSocket.getOutputStream()); while ((je = (JoinEvent) in.readObject()) != null) { - if(je.getRole().equals(Role.NEXT)){ + if(je.getRole().equals(Role.SUCCESSOR)){ newGuy = je.getName(); //first time listener has himself as suc and pre if(pre.equals(suc)){ //send self to joining node - joiningje = new JoinEvent(suc, Role.NEXT); + joiningje = new JoinEvent(suc, Role.SUCCESSOR); joiningOut.writeObject(joiningje); //joining node is now pre and suc, EventPlayer and EventReplayer is spawned preSocket = joiningSocket; - nextSocket = joiningSocket; + sucSocket = joiningSocket; pre = newGuy; suc = newGuy; - dte.newEventPlayer(nextSocket, myKey); + dte.newEventPlayer(sucSocket, myKey); dte.newEventReplayer(preSocket, myKey); } else { - ObjectOutputStream sucOut = new ObjectOutputStream(nextSocket.getOutputStream()); + ObjectOutputStream sucOut = new ObjectOutputStream(sucSocket.getOutputStream()); //send new node to own successor - sucje = new JoinEvent(newGuy, Role.PREVIOUS); + sucje = new JoinEvent(newGuy, Role.PREDECESSOR); sucOut.writeObject(sucje); //send own successor to joining node - joiningje = new JoinEvent(suc, Role.NEXT); + joiningje = new JoinEvent(suc, Role.SUCCESSOR); joiningOut.writeObject(joiningje); //dræb eventplayer, andens EventReplayer, mÃ¥ske et DisconnectEvent??? dte.disconnect(); //VED IKKE MED DEN HER - while (!nextSocket.isClosed()) {} + while (!sucSocket.isClosed()) {} suc = newGuy; - nextSocket = joiningSocket; - dte.newEventPlayer(nextSocket, myKey); + sucSocket = joiningSocket; + dte.newEventPlayer(sucSocket, myKey); } } + else if (je.getRole().equals(Role.ABORTSUCCESSOR)){ + //Vi kommer aldrig herned :( find en løsning + //node is leaving, so we need new successor + suc = je.getName(); + //cut connection to old suc + dte.disconnect(); + sucSocket = new Socket(suc.getAddress(), suc.getPort()); + } else {//receive new node as new predecessor (je.getRole().equals(Role.PREVIOUS)) pre = je.getName(); - preSocket.close(); + //ved ikke med close, det burde den gamle predecessors disconnect have taget sig af + //preSocket.close(); preSocket = new Socket(pre.getAddress(),pre.getPort()); dte.newEventReplayer(preSocket, myKey); } diff --git a/src/DistributedTextEditor.java b/src/DistributedTextEditor.java index f6ee100..f978d93 100644 --- a/src/DistributedTextEditor.java +++ b/src/DistributedTextEditor.java @@ -20,7 +20,9 @@ public class DistributedTextEditor extends JFrame { private DocumentEventCapturer dec = new DocumentEventCapturer(); - JupiterSynchronizer jupiterSynchronizer = new JupiterSynchronizer(); + private JupiterSynchronizer jupiterSynchronizer = new JupiterSynchronizer(); + + private ChordNameService chordNameService; public DistributedTextEditor() { area1.setFont(new Font("Monospaced",Font.PLAIN,12)); @@ -111,7 +113,7 @@ public void listen(){ Connect.setEnabled(false); InetSocketAddress name = _getMyName(); - ChordNameService chordNameService = new ChordNameServiceImpl(name, this); + chordNameService = new ChordNameServiceImpl(name, this); chordNameService.createGroup(); } @@ -137,20 +139,14 @@ public void connect(){ InetSocketAddress name = _getMyName(); - ChordNameService chordNameService = new ChordNameServiceImpl(name, this); + chordNameService = new ChordNameServiceImpl(name, this); chordNameService.joinGroup(knownPeer); } //Disconnect method for this DistributedTextEditor Action Disconnect = new AbstractAction("Disconnect") { public void actionPerformed(ActionEvent e) { - try { - DisconnectEvent disconnectEvent = new DisconnectEvent(0); - disconnectEvent.setShouldDisconnect(); - dec.eventHistory.put(disconnectEvent); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } + chordNameService.leaveGroup(); setTitle("Disconnected"); jupiterSynchronizer.clear(); } diff --git a/src/EventReplayer.java b/src/EventReplayer.java index 0bfdca4..92a27e9 100644 --- a/src/EventReplayer.java +++ b/src/EventReplayer.java @@ -68,12 +68,8 @@ public void run() { }); } else if (mte instanceof DisconnectEvent) { terminate(); - //The DisconnectEvent send first should make own client send a DisconnectEvent and then close itself - if(((DisconnectEvent) mte).shouldDisconnect()) { - distributedTextEditor.disconnect(); - } - //Second and last DisconnectEvent should close socket - else socket.close(); + //The DisconnectEvent should close this connection + socket.close(); break; } } diff --git a/src/JoinEvent.java b/src/JoinEvent.java index 9e81d5d..b5d2e43 100644 --- a/src/JoinEvent.java +++ b/src/JoinEvent.java @@ -2,7 +2,7 @@ import java.net.InetSocketAddress; enum Role{ - PREVIOUS, NEXT + PREDECESSOR, SUCCESSOR, ABORTSUCCESSOR } public class JoinEvent implements Serializable { private final InetSocketAddress name; From 17d516ec2ed85782510a35c6c3acd107a8cbe0e5 Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 18:36:16 +0200 Subject: [PATCH 05/18] =?UTF-8?q?Tr=C3=A5d=20til=20at=20lytte=20efter=20di?= =?UTF-8?q?sco=20i=20tr=C3=A6sko?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ChordNameServiceImpl.java | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 10568e3..f872236 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -99,13 +99,13 @@ public void run() { preout.writeObject(je); //We cut connection with own suc dte.disconnect(); - //DETTE KUNNE VIRKE, hvis pre hele tiden så hvad der kommer ind i streamen, altså altid er i processNode. Det er den bare ikke :((( + //DETTE KUNNE VIRKE, hvis pre hele tiden s� hvad der kommer ind i streamen, alts� altid er i processNode. Det er den bare ikke :((( } catch (IOException e) { e.printStackTrace(); } - //leaveGroup sætter active=false, så her skal lukkes sockets og gøres rent + //leaveGroup s�tter active=false, s� her skal lukkes sockets og g�res rent /* * If joining we should now enter the existing group and @@ -181,16 +181,31 @@ public void processNode(Socket joiningSocket){ suc = newGuy; sucSocket = joiningSocket; dte.newEventPlayer(sucSocket, myKey); + Runnable discoStream = new Runnable() { + @Override + public void run() { + ObjectInputStream discoIn = null; + JoinEvent discoje = null; + try { + discoIn = new ObjectInputStream(sucSocket.getInputStream()); + while ((discoje = (JoinEvent) discoIn.readObject()) != null) { + if(discoje.getRole().equals(Role.ABORTSUCCESSOR)){ + //node is leaving, so we need new successor + suc = je.getName(); + //cut connection to old suc + dte.disconnect(); + sucSocket = new Socket(suc.getAddress(), suc.getPort()); + break; + } + } + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + } + }; + discoStream.run(); } } - else if (je.getRole().equals(Role.ABORTSUCCESSOR)){ - //Vi kommer aldrig herned :( find en løsning - //node is leaving, so we need new successor - suc = je.getName(); - //cut connection to old suc - dte.disconnect(); - sucSocket = new Socket(suc.getAddress(), suc.getPort()); - } else {//receive new node as new predecessor (je.getRole().equals(Role.PREVIOUS)) pre = je.getName(); //ved ikke med close, det burde den gamle predecessors disconnect have taget sig af From ce96ccb64d95e2213d7e9a2a98e82781a6701bf4 Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 18:38:16 +0200 Subject: [PATCH 06/18] minor error sry --- src/ChordNameServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index f872236..2f09b5e 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -191,7 +191,7 @@ public void run() { while ((discoje = (JoinEvent) discoIn.readObject()) != null) { if(discoje.getRole().equals(Role.ABORTSUCCESSOR)){ //node is leaving, so we need new successor - suc = je.getName(); + suc = discoje.getName(); //cut connection to old suc dte.disconnect(); sucSocket = new Socket(suc.getAddress(), suc.getPort()); From e370c60683a0d5e5998592dd15315b88902f6b54 Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 20:24:17 +0200 Subject: [PATCH 07/18] DO OVER --- src/ChordNameServiceImpl.java | 150 +++------------------------------- src/JoinEvent.java | 2 +- 2 files changed, 12 insertions(+), 140 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 2f09b5e..099562a 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -8,7 +8,6 @@ public class ChordNameServiceImpl extends Thread implements ChordNameService { private DistributedTextEditor dte; - private boolean joining; private int port; protected InetSocketAddress myName; protected int myKey; @@ -20,6 +19,7 @@ public class ChordNameServiceImpl extends Thread implements ChordNameService { private Socket preSocket; private Socket sucSocket; private boolean active; + private boolean first; public ChordNameServiceImpl(InetSocketAddress myName, DistributedTextEditor dte){ this.myName = myName; @@ -38,8 +38,8 @@ public InetSocketAddress getChordName() { } public void createGroup() { - joining = false; active = true; + first = true; myKey = keyOfName(myName); this.suc = getChordName(); this.pre = getChordName(); @@ -47,7 +47,6 @@ public void createGroup() { } public void joinGroup(InetSocketAddress knownPeer) { - joining = true; active = true; this.port = knownPeer.getPort(); connectedAt = knownPeer; @@ -62,50 +61,17 @@ public void leaveGroup() { public void run() { System.out.println("My name is " + myName + " and my key is " + myKey); - if(joining){ - //Create socket for connection with a listening DistributedTextEditor - try{ - preSocket = new Socket(connectedAt.getAddress(), connectedAt.getPort()); - } catch (IOException e1) { - e1.printStackTrace(); - } - squeezeIn(preSocket); - dte.setTitle("Connected to " + connectedAt); - dte.newEventPlayer(sucSocket, myKey); - dte.newEventReplayer(preSocket, myKey); - } - - dte.setTitle("I'm listening on " + myName.getAddress()+":"+myName.getPort()); - try { - serverSocket = new ServerSocket(port); - } catch (IOException e) { - e.printStackTrace(); - } while(active) { //Create server socket and listen until another DistributedTextEditor connects try { joiningSocket = serverSocket.accept(); - processNode(joiningSocket); + if(first){ + firstConnection(joiningSocket); + } } catch (IOException e1) { e1.printStackTrace(); } } - //we want to leave the chord - JoinEvent je = null; - try { - //Send own suc to pre, so it knows which to connect on - ObjectOutputStream preout = new ObjectOutputStream(preSocket.getOutputStream()); - je = new JoinEvent(suc, Role.ABORTSUCCESSOR); - preout.writeObject(je); - //We cut connection with own suc - dte.disconnect(); - //DETTE KUNNE VIRKE, hvis pre hele tiden s� hvad der kommer ind i streamen, alts� altid er i processNode. Det er den bare ikke :((( - - } catch (IOException e) { - e.printStackTrace(); - } - - //leaveGroup s�tter active=false, s� her skal lukkes sockets og g�res rent /* * If joining we should now enter the existing group and @@ -118,106 +84,12 @@ public void run() { */ } - public void squeezeIn(Socket preSocket){ - JoinEvent je = null; - try { - ObjectOutputStream out = new ObjectOutputStream(preSocket.getOutputStream()); - je = new JoinEvent(myName, Role.SUCCESSOR); - out.writeObject(je); - ObjectInputStream in = new ObjectInputStream(preSocket.getInputStream()); - while ((je = (JoinEvent) in.readObject()) != null) { - //First time around, the listener returns himself as his own successor - if(je.getName().equals(connectedAt)){ - suc = pre; - sucSocket = preSocket; - } - else if(je.getRole().equals(Role.SUCCESSOR)){ - suc=je.getName(); - sucSocket = new Socket(suc.getAddress(), suc.getPort()); - } - break; - } - } catch (IOException | ClassNotFoundException e) { - e.printStackTrace(); - } - } - - public void processNode(Socket joiningSocket){ - JoinEvent je; - JoinEvent sucje; - JoinEvent joiningje; - InetSocketAddress newGuy; - try { - ObjectInputStream in = new ObjectInputStream(joiningSocket.getInputStream()); - ObjectOutputStream joiningOut = new ObjectOutputStream(joiningSocket.getOutputStream()); - while ((je = (JoinEvent) in.readObject()) != null) { - if(je.getRole().equals(Role.SUCCESSOR)){ - newGuy = je.getName(); - //first time listener has himself as suc and pre - if(pre.equals(suc)){ - //send self to joining node - joiningje = new JoinEvent(suc, Role.SUCCESSOR); - joiningOut.writeObject(joiningje); - //joining node is now pre and suc, EventPlayer and EventReplayer is spawned - preSocket = joiningSocket; - sucSocket = joiningSocket; - pre = newGuy; - suc = newGuy; - dte.newEventPlayer(sucSocket, myKey); - dte.newEventReplayer(preSocket, myKey); - } - else { - ObjectOutputStream sucOut = new ObjectOutputStream(sucSocket.getOutputStream()); - //send new node to own successor - sucje = new JoinEvent(newGuy, Role.PREDECESSOR); - sucOut.writeObject(sucje); - //send own successor to joining node - joiningje = new JoinEvent(suc, Role.SUCCESSOR); - joiningOut.writeObject(joiningje); - //dræb eventplayer, andens EventReplayer, mÃ¥ske et DisconnectEvent??? - dte.disconnect(); - //VED IKKE MED DEN HER - while (!sucSocket.isClosed()) {} - suc = newGuy; - sucSocket = joiningSocket; - dte.newEventPlayer(sucSocket, myKey); - Runnable discoStream = new Runnable() { - @Override - public void run() { - ObjectInputStream discoIn = null; - JoinEvent discoje = null; - try { - discoIn = new ObjectInputStream(sucSocket.getInputStream()); - while ((discoje = (JoinEvent) discoIn.readObject()) != null) { - if(discoje.getRole().equals(Role.ABORTSUCCESSOR)){ - //node is leaving, so we need new successor - suc = discoje.getName(); - //cut connection to old suc - dte.disconnect(); - sucSocket = new Socket(suc.getAddress(), suc.getPort()); - break; - } - } - } catch (IOException | ClassNotFoundException e) { - e.printStackTrace(); - } - } - }; - discoStream.run(); - } - } - else {//receive new node as new predecessor (je.getRole().equals(Role.PREVIOUS)) - pre = je.getName(); - //ved ikke med close, det burde den gamle predecessors disconnect have taget sig af - //preSocket.close(); - preSocket = new Socket(pre.getAddress(),pre.getPort()); - dte.newEventReplayer(preSocket, myKey); - } - break; - } - } catch (IOException | ClassNotFoundException e) { - e.printStackTrace(); - } + private void firstConnection(Socket joiningSocket) { + preSocket = joiningSocket; + dte.newEventPlayer(preSocket, myKey); + sucSocket = joiningSocket; + dte.newEventReplayer(sucSocket, myKey); + first = false; } } \ No newline at end of file diff --git a/src/JoinEvent.java b/src/JoinEvent.java index b5d2e43..94b339b 100644 --- a/src/JoinEvent.java +++ b/src/JoinEvent.java @@ -2,7 +2,7 @@ import java.net.InetSocketAddress; enum Role{ - PREDECESSOR, SUCCESSOR, ABORTSUCCESSOR + PREDECESSOR, SUCCESSOR, ABORT } public class JoinEvent implements Serializable { private final InetSocketAddress name; From 220760fd90c9bacb9e7581d1966d140a3e119904 Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 22:41:33 +0200 Subject: [PATCH 08/18] Everything works --- src/ChordNameServiceImpl.java | 95 ++++++++++++++++++++-------------- src/DisconnectEvent.java | 25 ++++----- src/DisconnectThread.java | 41 +++++++++++++++ src/DistributedTextEditor.java | 37 ++++++------- src/EventPlayer.java | 16 ++++-- src/EventReplayer.java | 17 +++--- src/JoinEvent.java | 23 -------- src/ServerThread.java | 71 +++++++++++++++++++++++++ 8 files changed, 220 insertions(+), 105 deletions(-) create mode 100644 src/DisconnectThread.java delete mode 100644 src/JoinEvent.java create mode 100644 src/ServerThread.java diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 099562a..723102d 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -5,7 +5,7 @@ import java.net.ServerSocket; import java.net.Socket; -public class ChordNameServiceImpl extends Thread implements ChordNameService { +public class ChordNameServiceImpl { private DistributedTextEditor dte; private int port; @@ -15,11 +15,29 @@ public class ChordNameServiceImpl extends Thread implements ChordNameService { private InetSocketAddress pre; private InetSocketAddress connectedAt; private ServerSocket serverSocket; - private Socket joiningSocket; - private Socket preSocket; - private Socket sucSocket; + + private Socket preSocket, sucSocket; + private DisconnectThread disconnectThread; + + public Socket getSucSocket() { + return sucSocket; + } + + public void setSucSocket(Socket sucSocket) { + this.sucSocket = sucSocket; + } + + public Socket getPreSocket() { + return preSocket; + } + + public void setPreSocket(Socket preSocket) { + this.preSocket = preSocket; + } + private boolean active; private boolean first; + private ServerThread serverThread; public ChordNameServiceImpl(InetSocketAddress myName, DistributedTextEditor dte){ this.myName = myName; @@ -37,42 +55,51 @@ public InetSocketAddress getChordName() { return myName; } - public void createGroup() { - active = true; - first = true; - myKey = keyOfName(myName); - this.suc = getChordName(); - this.pre = getChordName(); - start(); + public void createGroup(){ + serverThread = new ServerThread(dte,this); + new Thread(serverThread).run(); } public void joinGroup(InetSocketAddress knownPeer) { active = true; - this.port = knownPeer.getPort(); connectedAt = knownPeer; - myKey = keyOfName(myName); - this.pre = knownPeer; - start(); + + try { + // Setup successor + sucSocket = new Socket(knownPeer.getAddress(),port); + + // Start listening for disconnects from successor + disconnectThread = new DisconnectThread(dte,this,sucSocket); + new Thread(disconnectThread).run(); + + dte.newEventPlayer(sucSocket, myKey); + + // Wait for new predecessor + ServerSocket server = new ServerSocket(port); + preSocket = server.accept(); + dte.newEventReplayer(preSocket, myKey); + + // Keep listining for new joins + serverThread = new ServerThread(dte,this,server); + new Thread(serverThread).run(); + } catch (IOException e) { + e.printStackTrace(); + } } public void leaveGroup() { - active = false; - } + try { + ObjectOutputStream disconnectStream = new ObjectOutputStream(preSocket.getOutputStream()); + disconnectStream.writeObject(new DisconnectEvent(sucSocket.getInetAddress())); + preSocket.close(); + sucSocket.close(); - public void run() { - System.out.println("My name is " + myName + " and my key is " + myKey); - while(active) { - //Create server socket and listen until another DistributedTextEditor connects - try { - joiningSocket = serverSocket.accept(); - if(first){ - firstConnection(joiningSocket); - } - } catch (IOException e1) { - e1.printStackTrace(); - } + } catch (IOException e) { + e.printStackTrace(); } + } + /* * If joining we should now enter the existing group and * should at some point register this peer on its port if not @@ -82,14 +109,4 @@ public void run() { * should return so that the thread running it might * terminate. */ - } - - private void firstConnection(Socket joiningSocket) { - preSocket = joiningSocket; - dte.newEventPlayer(preSocket, myKey); - sucSocket = joiningSocket; - dte.newEventReplayer(sucSocket, myKey); - first = false; - } - } \ No newline at end of file diff --git a/src/DisconnectEvent.java b/src/DisconnectEvent.java index 2082329..6fa8a21 100644 --- a/src/DisconnectEvent.java +++ b/src/DisconnectEvent.java @@ -1,20 +1,17 @@ +import java.io.Serializable; +import java.net.InetAddress; + /** - * DisconnectEvent is a MyTextEvent used to let all the threads know - * when to close. It contains a boolean to let the threads close - * in the right order, and in the end close the socket. - * Created by Kresten Axelsen on 27-04-2015. + * Originally created by Kresten Axelsen on 27-04-3015, a day we will all remember. */ -public class DisconnectEvent extends MyTextEvent { - DisconnectEvent(int offset) { - super(offset); - } - private boolean shouldDisconnect; +public class DisconnectEvent implements Serializable { + InetAddress newSuccessor; - public boolean shouldDisconnect() { - return shouldDisconnect; + public DisconnectEvent(InetAddress successor) { + newSuccessor = successor; } - public void setShouldDisconnect() { - shouldDisconnect = true; + public InetAddress getNewSuccessor() { + return newSuccessor; } -} +} \ No newline at end of file diff --git a/src/DisconnectThread.java b/src/DisconnectThread.java new file mode 100644 index 0000000..4ef6d3c --- /dev/null +++ b/src/DisconnectThread.java @@ -0,0 +1,41 @@ +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.InetAddress; +import java.net.Socket; + +public class DisconnectThread implements Runnable { + + private Socket socket; + private final ChordNameServiceImpl cns; + private final DistributedTextEditor dte; + + public DisconnectThread(DistributedTextEditor dte, ChordNameServiceImpl cns, Socket predecessor) { + socket = predecessor; + this.cns = cns; + this.dte = dte; + } + + @Override + public void run() { + try { + while(true) { + ObjectInputStream disconnectStream = new ObjectInputStream(socket.getInputStream()); + DisconnectEvent de; + while ((de = (DisconnectEvent) disconnectStream.readObject()) != null) { + } + + InetAddress newSuccessor = de.getNewSuccessor(); + cns.setSucSocket(new Socket(newSuccessor, cns.getChordName().getPort())); + + dte.newEventPlayer(cns.getSucSocket(), cns.keyOfName(cns.getChordName())); + + // New successor + socket = cns.getSucSocket(); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } +} diff --git a/src/DistributedTextEditor.java b/src/DistributedTextEditor.java index f978d93..71ec59c 100644 --- a/src/DistributedTextEditor.java +++ b/src/DistributedTextEditor.java @@ -22,7 +22,9 @@ public class DistributedTextEditor extends JFrame { private JupiterSynchronizer jupiterSynchronizer = new JupiterSynchronizer(); - private ChordNameService chordNameService; + private ChordNameServiceImpl chordNameService; + private EventPlayer ep; + private EventReplayer er; public DistributedTextEditor() { area1.setFont(new Font("Monospaced",Font.PLAIN,12)); @@ -152,17 +154,6 @@ public void actionPerformed(ActionEvent e) { } }; - //Disconnect method for the connected DistributedTextEditor - public void disconnect(){ - try { - dec.eventHistory.put(new DisconnectEvent(0)); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - setTitle("Disconnected"); - jupiterSynchronizer.clear(); - } - Action Save = new AbstractAction("Save") { public void actionPerformed(ActionEvent e) { if(!currentFile.equals("Untitled")) @@ -215,21 +206,31 @@ private void saveFile(String fileName) { } public void newEventPlayer(Socket socket, int id){ - EventPlayer ep = new EventPlayer(socket, dec, this, id, jupiterSynchronizer); - Thread ept = new Thread(ep); - ept.start(); + if (ep == null) { + ep = new EventPlayer(socket, dec, this, id, jupiterSynchronizer); + Thread ept = new Thread(ep); + ept.start(); + } else + ep.updateSocket(socket); } public void newEventReplayer(Socket socket, int id){ - EventReplayer er = new EventReplayer(socket, area1, this, jupiterSynchronizer); - Thread ert = new Thread(er); - ert.start(); + if(er == null) { + er = new EventReplayer(socket, area1, this, jupiterSynchronizer); + Thread ert = new Thread(er); + ert.start(); + } else + er.updateSocket(socket); } public JTextArea getArea1(){ return area1; } + public int getPort() { + return Integer.parseInt(portNumber.getText()); + } + public static void main(String[] arg) { new DistributedTextEditor(); } diff --git a/src/EventPlayer.java b/src/EventPlayer.java index de2af69..2ac449a 100644 --- a/src/EventPlayer.java +++ b/src/EventPlayer.java @@ -11,6 +11,7 @@ public class EventPlayer implements Runnable { private Socket socket; private DocumentEventCapturer dec; private boolean running = true; + private ObjectOutputStream out; public EventPlayer(Socket socket, DocumentEventCapturer dec, DistributedTextEditor distributedTextEditor, int id, JupiterSynchronizer jupiterSynchronizer) { this.dec = dec; @@ -20,19 +21,24 @@ public EventPlayer(Socket socket, DocumentEventCapturer dec, DistributedTextEdit this.jupiterSynchronizer = jupiterSynchronizer; } + public void updateSocket(Socket socket) { + this.socket = socket; + try { + out = new ObjectOutputStream(socket.getOutputStream()); + } catch (IOException e) { + e.printStackTrace(); + } + } + public void run() { try { - ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); + out = new ObjectOutputStream(socket.getOutputStream()); while (running) { //Take every MyTextEvent and send it to the connected DistributedTextEditor's EventReplayer MyTextEvent mte = dec.take(); mte = jupiterSynchronizer.generate(mte); mte.setId(id); out.writeObject(mte); - //If the MyTextEvent received is a DisconnectEvent, close the thread - if (mte instanceof DisconnectEvent) { - terminate(); - } } } catch (IOException | InterruptedException e) { e.printStackTrace(); diff --git a/src/EventReplayer.java b/src/EventReplayer.java index 92a27e9..11fe697 100644 --- a/src/EventReplayer.java +++ b/src/EventReplayer.java @@ -21,6 +21,7 @@ public class EventReplayer implements Runnable { private Socket socket; private JTextArea area; private boolean running = true; + private ObjectInputStream in; public EventReplayer(Socket socket, JTextArea area, DistributedTextEditor distributedTextEditor, JupiterSynchronizer jupiterSynchronizer) { @@ -30,10 +31,19 @@ public EventReplayer(Socket socket, JTextArea area, DistributedTextEditor distri this.jupiterSynchronizer = jupiterSynchronizer; } + public void updateSocket(Socket socket) { + this.socket = socket; + try { + in = new ObjectInputStream(socket.getInputStream()); + } catch (IOException e) { + e.printStackTrace(); + } + } + public void run() { try { while (running) { - ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); + in = new ObjectInputStream(socket.getInputStream()); MyTextEvent mte = null; try { while ((mte = (MyTextEvent) in.readObject()) != null) { @@ -66,11 +76,6 @@ public void run() { } } }); - } else if (mte instanceof DisconnectEvent) { - terminate(); - //The DisconnectEvent should close this connection - socket.close(); - break; } } } catch (Exception _) { diff --git a/src/JoinEvent.java b/src/JoinEvent.java deleted file mode 100644 index 94b339b..0000000 --- a/src/JoinEvent.java +++ /dev/null @@ -1,23 +0,0 @@ -import java.io.Serializable; -import java.net.InetSocketAddress; - -enum Role{ - PREDECESSOR, SUCCESSOR, ABORT -} -public class JoinEvent implements Serializable { - private final InetSocketAddress name; - private final Role role; - - public JoinEvent(InetSocketAddress name, Role role){ - this.name = name; - this.role = role; - } - - public InetSocketAddress getName() { - return name; - } - - public Role getRole() { - return role; - } -} diff --git a/src/ServerThread.java b/src/ServerThread.java new file mode 100644 index 0000000..2089e77 --- /dev/null +++ b/src/ServerThread.java @@ -0,0 +1,71 @@ +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; + +public class ServerThread implements Runnable { + + private final ChordNameServiceImpl cns; + private final int port; + private DistributedTextEditor dte; + private Socket joiningSocket; + private ServerSocket server; + + public ServerThread(DistributedTextEditor dte, ChordNameServiceImpl cns) { + this.dte = dte; + this.cns = cns; + port = cns.getChordName().getPort(); + } + + public ServerThread(DistributedTextEditor dte, ChordNameServiceImpl cns, ServerSocket ss) { + this.dte = dte; + this.cns = cns; + port = cns.getChordName().getPort(); + server = ss; + } + + @Override + public void run() { + int myKey = cns.keyOfName(cns.getChordName()); + + try { + // First join + if (server == null) { + server = new ServerSocket(port); + + joiningSocket = server.accept(); + cns.setPreSocket(joiningSocket); + dte.newEventReplayer(joiningSocket, myKey); + + cns.setSucSocket(new Socket(joiningSocket.getInetAddress(), port)); + dte.newEventPlayer(joiningSocket, myKey); + + DisconnectThread disconnectThread = new DisconnectThread(dte, cns, cns.getSucSocket()); + + joiningSocket = null; + } + + while(true) { + joiningSocket = server.accept(); + + Socket preSocket = cns.getPreSocket(); + + ObjectOutputStream disconnectStream = new ObjectOutputStream(preSocket.getOutputStream()); + + disconnectStream.writeObject(new DisconnectEvent(preSocket.getInetAddress())); + preSocket.close(); + + cns.setPreSocket(joiningSocket); + dte.newEventReplayer(preSocket, myKey); + + joiningSocket = null; + } + + } catch (IOException e) { + e.printStackTrace(); + } + } +} From 0ae19909821633927b18c7707639b8d375f59f48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20Malling=20=C3=98stergaard?= Date: Wed, 27 May 2015 23:13:57 +0200 Subject: [PATCH 09/18] Introduce ConnectEvent --- src/ConnectEvent.java | 7 +++++++ src/DisconnectThread.java | 18 +++++++++++------- src/ServerThread.java | 14 +++++++++++++- 3 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 src/ConnectEvent.java diff --git a/src/ConnectEvent.java b/src/ConnectEvent.java new file mode 100644 index 0000000..e8a7030 --- /dev/null +++ b/src/ConnectEvent.java @@ -0,0 +1,7 @@ +import java.net.InetAddress; + +public class ConnectEvent { + + private boolean krestenInsists = true; + +} diff --git a/src/DisconnectThread.java b/src/DisconnectThread.java index 4ef6d3c..005bc0b 100644 --- a/src/DisconnectThread.java +++ b/src/DisconnectThread.java @@ -20,17 +20,21 @@ public void run() { try { while(true) { ObjectInputStream disconnectStream = new ObjectInputStream(socket.getInputStream()); - DisconnectEvent de; - while ((de = (DisconnectEvent) disconnectStream.readObject()) != null) { + Object de; + while ((de = disconnectStream.readObject()) != null) { } - InetAddress newSuccessor = de.getNewSuccessor(); - cns.setSucSocket(new Socket(newSuccessor, cns.getChordName().getPort())); + if (de instanceof DisconnectEvent) { + InetAddress newSuccessor = ((DisconnectEvent) de).getNewSuccessor(); + cns.setSucSocket(new Socket(newSuccessor, cns.getChordName().getPort())); - dte.newEventPlayer(cns.getSucSocket(), cns.keyOfName(cns.getChordName())); + dte.newEventPlayer(cns.getSucSocket(), cns.keyOfName(cns.getChordName())); - // New successor - socket = cns.getSucSocket(); + // New successor + socket = cns.getSucSocket(); + } else if (de instanceof ConnectEvent) { + cns.setSucSocket(cns.getPreSocket()); + } } } catch (IOException e) { e.printStackTrace(); diff --git a/src/ServerThread.java b/src/ServerThread.java index 2089e77..c2bb16c 100644 --- a/src/ServerThread.java +++ b/src/ServerThread.java @@ -8,6 +8,7 @@ public class ServerThread implements Runnable { + private String TAG = "ServerThread"; private final ChordNameServiceImpl cns; private final int port; private DistributedTextEditor dte; @@ -36,13 +37,24 @@ public void run() { if (server == null) { server = new ServerSocket(port); + System.out.println(TAG + " is hosting"); joiningSocket = server.accept(); + System.out.println(TAG + " has one friend"); + cns.setPreSocket(joiningSocket); dte.newEventReplayer(joiningSocket, myKey); - cns.setSucSocket(new Socket(joiningSocket.getInetAddress(), port)); + System.out.println(TAG + " spawns ERP"); + + //cns.setSucSocket(new Socket(joiningSocket.getInetAddress(), port)); + cns.setSucSocket(joiningSocket); + ObjectOutputStream connectStream = new ObjectOutputStream(joiningSocket.getOutputStream()); + connectStream.writeObject(new ConnectEvent()); + dte.newEventPlayer(joiningSocket, myKey); + System.out.println(TAG + " spawns EP"); + DisconnectThread disconnectThread = new DisconnectThread(dte, cns, cns.getSucSocket()); joiningSocket = null; From d8a6a3c19836dee5c3de7fbb6481aff08bbe9b97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20Malling=20=C3=98stergaard?= Date: Wed, 27 May 2015 23:16:07 +0200 Subject: [PATCH 10/18] DOOH --- src/ConnectEvent.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ConnectEvent.java b/src/ConnectEvent.java index e8a7030..76402f0 100644 --- a/src/ConnectEvent.java +++ b/src/ConnectEvent.java @@ -1,6 +1,7 @@ +import java.io.Serializable; import java.net.InetAddress; -public class ConnectEvent { +public class ConnectEvent implements Serializable { private boolean krestenInsists = true; From c8f6930468669bd8c6c15056201e0e12ff9508dc Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 23:22:56 +0200 Subject: [PATCH 11/18] 1 outStream i ServerThread --- src/ConnectEvent.java | 4 ++-- src/DisconnectThread.java | 19 +++++++++---------- src/ServerThread.java | 9 +++++---- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/ConnectEvent.java b/src/ConnectEvent.java index e8a7030..dcd6583 100644 --- a/src/ConnectEvent.java +++ b/src/ConnectEvent.java @@ -1,6 +1,6 @@ -import java.net.InetAddress; +import java.io.Serializable; -public class ConnectEvent { +public class ConnectEvent implements Serializable{ private boolean krestenInsists = true; diff --git a/src/DisconnectThread.java b/src/DisconnectThread.java index 005bc0b..a369ec4 100644 --- a/src/DisconnectThread.java +++ b/src/DisconnectThread.java @@ -22,18 +22,17 @@ public void run() { ObjectInputStream disconnectStream = new ObjectInputStream(socket.getInputStream()); Object de; while ((de = disconnectStream.readObject()) != null) { - } - - if (de instanceof DisconnectEvent) { - InetAddress newSuccessor = ((DisconnectEvent) de).getNewSuccessor(); - cns.setSucSocket(new Socket(newSuccessor, cns.getChordName().getPort())); + if (de instanceof DisconnectEvent) { + InetAddress newSuccessor = ((DisconnectEvent) de).getNewSuccessor(); + cns.setSucSocket(new Socket(newSuccessor, cns.getChordName().getPort())); - dte.newEventPlayer(cns.getSucSocket(), cns.keyOfName(cns.getChordName())); + dte.newEventPlayer(cns.getSucSocket(), cns.keyOfName(cns.getChordName())); - // New successor - socket = cns.getSucSocket(); - } else if (de instanceof ConnectEvent) { - cns.setSucSocket(cns.getPreSocket()); + // New successor + socket = cns.getSucSocket(); + } else if (de instanceof ConnectEvent) { + cns.setSucSocket(cns.getPreSocket()); + } } } } catch (IOException e) { diff --git a/src/ServerThread.java b/src/ServerThread.java index c2bb16c..89e5754 100644 --- a/src/ServerThread.java +++ b/src/ServerThread.java @@ -14,6 +14,7 @@ public class ServerThread implements Runnable { private DistributedTextEditor dte; private Socket joiningSocket; private ServerSocket server; + private ObjectOutputStream outStream; public ServerThread(DistributedTextEditor dte, ChordNameServiceImpl cns) { this.dte = dte; @@ -48,8 +49,8 @@ public void run() { //cns.setSucSocket(new Socket(joiningSocket.getInetAddress(), port)); cns.setSucSocket(joiningSocket); - ObjectOutputStream connectStream = new ObjectOutputStream(joiningSocket.getOutputStream()); - connectStream.writeObject(new ConnectEvent()); + outStream = new ObjectOutputStream(joiningSocket.getOutputStream()); + outStream.writeObject(new ConnectEvent()); dte.newEventPlayer(joiningSocket, myKey); @@ -65,9 +66,9 @@ public void run() { Socket preSocket = cns.getPreSocket(); - ObjectOutputStream disconnectStream = new ObjectOutputStream(preSocket.getOutputStream()); + outStream = new ObjectOutputStream(preSocket.getOutputStream()); - disconnectStream.writeObject(new DisconnectEvent(preSocket.getInetAddress())); + outStream.writeObject(new DisconnectEvent(preSocket.getInetAddress())); preSocket.close(); cns.setPreSocket(joiningSocket); From 6a49b14a44141cee7a424544a6ebbab7eac97f20 Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 23:24:37 +0200 Subject: [PATCH 12/18] 1 outStream i ServerThread --- src/ConnectEvent.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ConnectEvent.java b/src/ConnectEvent.java index e8a7030..ccbde2e 100644 --- a/src/ConnectEvent.java +++ b/src/ConnectEvent.java @@ -1,6 +1,6 @@ -import java.net.InetAddress; +import java.io.Serializable; -public class ConnectEvent { +public class ConnectEvent implements Serializable { private boolean krestenInsists = true; From 48bb143b0b20de088467743057ee6047c4e08b24 Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 23:50:18 +0200 Subject: [PATCH 13/18] =?UTF-8?q?Pr=C3=B8v=20det=20her=20dirty=20fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ChordNameServiceImpl.java | 6 +++--- src/ServerThread.java | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 723102d..7dc832b 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -57,7 +57,7 @@ public InetSocketAddress getChordName() { public void createGroup(){ serverThread = new ServerThread(dte,this); - new Thread(serverThread).run(); + new Thread(serverThread).start(); } public void joinGroup(InetSocketAddress knownPeer) { @@ -70,7 +70,7 @@ public void joinGroup(InetSocketAddress knownPeer) { // Start listening for disconnects from successor disconnectThread = new DisconnectThread(dte,this,sucSocket); - new Thread(disconnectThread).run(); + new Thread(disconnectThread).start(); dte.newEventPlayer(sucSocket, myKey); @@ -81,7 +81,7 @@ public void joinGroup(InetSocketAddress knownPeer) { // Keep listining for new joins serverThread = new ServerThread(dte,this,server); - new Thread(serverThread).run(); + new Thread(serverThread).start(); } catch (IOException e) { e.printStackTrace(); } diff --git a/src/ServerThread.java b/src/ServerThread.java index 89e5754..0aee61c 100644 --- a/src/ServerThread.java +++ b/src/ServerThread.java @@ -32,8 +32,10 @@ public ServerThread(DistributedTextEditor dte, ChordNameServiceImpl cns, ServerS @Override public void run() { int myKey = cns.keyOfName(cns.getChordName()); + dte.setTitle("I'm listening on " + cns.getChordName().getAddress() + ":" + port); try { + outStream = new ObjectOutputStream(joiningSocket.getOutputStream()); // First join if (server == null) { server = new ServerSocket(port); @@ -49,7 +51,6 @@ public void run() { //cns.setSucSocket(new Socket(joiningSocket.getInetAddress(), port)); cns.setSucSocket(joiningSocket); - outStream = new ObjectOutputStream(joiningSocket.getOutputStream()); outStream.writeObject(new ConnectEvent()); dte.newEventPlayer(joiningSocket, myKey); @@ -57,6 +58,7 @@ public void run() { System.out.println(TAG + " spawns EP"); DisconnectThread disconnectThread = new DisconnectThread(dte, cns, cns.getSucSocket()); + new Thread(disconnectThread).start(); joiningSocket = null; } @@ -66,7 +68,7 @@ public void run() { Socket preSocket = cns.getPreSocket(); - outStream = new ObjectOutputStream(preSocket.getOutputStream()); + //outStream = new ObjectOutputStream(preSocket.getOutputStream()); outStream.writeObject(new DisconnectEvent(preSocket.getInetAddress())); preSocket.close(); From aea4d5f94e8c8875129479cbea81a6e94daf53eb Mon Sep 17 00:00:00 2001 From: Kresten Date: Wed, 27 May 2015 23:52:02 +0200 Subject: [PATCH 14/18] =?UTF-8?q?Pr=C3=B8v=20det=20her=20dirty=20fix2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ServerThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServerThread.java b/src/ServerThread.java index 0aee61c..abaa527 100644 --- a/src/ServerThread.java +++ b/src/ServerThread.java @@ -35,7 +35,6 @@ public void run() { dte.setTitle("I'm listening on " + cns.getChordName().getAddress() + ":" + port); try { - outStream = new ObjectOutputStream(joiningSocket.getOutputStream()); // First join if (server == null) { server = new ServerSocket(port); @@ -51,6 +50,7 @@ public void run() { //cns.setSucSocket(new Socket(joiningSocket.getInetAddress(), port)); cns.setSucSocket(joiningSocket); + outStream = new ObjectOutputStream(joiningSocket.getOutputStream()); outStream.writeObject(new ConnectEvent()); dte.newEventPlayer(joiningSocket, myKey); From 10c6da3f0e8a5181e6c66d36bf68956c329ff1c9 Mon Sep 17 00:00:00 2001 From: henrietterohde Date: Wed, 27 May 2015 23:53:19 +0200 Subject: [PATCH 15/18] =?UTF-8?q?tr=C3=A6ls?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ServerThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServerThread.java b/src/ServerThread.java index 89e5754..312eb4d 100644 --- a/src/ServerThread.java +++ b/src/ServerThread.java @@ -57,7 +57,7 @@ public void run() { System.out.println(TAG + " spawns EP"); DisconnectThread disconnectThread = new DisconnectThread(dte, cns, cns.getSucSocket()); - + joiningSocket = null; } From a561422b717566741b5ff6c354b0830f7e0b358d Mon Sep 17 00:00:00 2001 From: Kresten Date: Thu, 28 May 2015 10:44:44 +0200 Subject: [PATCH 16/18] =?UTF-8?q?Pr=C3=B8v=20det=20her=20dirty=20fix3=20Ti?= =?UTF-8?q?meout=20og=20port+1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ChordNameServiceImpl.java | 48 ++++++++++++++++++++++++++++++----- src/DisconnectThread.java | 5 ++++ src/ServerThread.java | 15 ++++++----- 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 7dc832b..331797e 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -4,6 +4,7 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketException; public class ChordNameServiceImpl { @@ -18,6 +19,7 @@ public class ChordNameServiceImpl { private Socket preSocket, sucSocket; private DisconnectThread disconnectThread; + private Socket hack; public Socket getSucSocket() { return sucSocket; @@ -63,33 +65,65 @@ public void createGroup(){ public void joinGroup(InetSocketAddress knownPeer) { active = true; connectedAt = knownPeer; - try { // Setup successor sucSocket = new Socket(knownPeer.getAddress(),port); - // Start listening for disconnects from successor - disconnectThread = new DisconnectThread(dte,this,sucSocket); - new Thread(disconnectThread).start(); dte.newEventPlayer(sucSocket, myKey); // Wait for new predecessor ServerSocket server = new ServerSocket(port); + server.setSoTimeout(1000); preSocket = server.accept(); + + // Start listening for disconnects from successor + disconnectThread = new DisconnectThread(dte,this,sucSocket); + new Thread(disconnectThread).start(); + dte.newEventReplayer(preSocket, myKey); - // Keep listining for new joins + // Keep listening for new joins serverThread = new ServerThread(dte,this,server); new Thread(serverThread).start(); - } catch (IOException e) { + + } + catch (SocketException e1){ + try { + first = true; + preSocket = sucSocket; + dte.newEventReplayer(preSocket, myKey); + hack = new Socket(preSocket.getInetAddress(), port+1); + + // Start listening for disconnects from successor + disconnectThread = new DisconnectThread(dte,this,hack); + new Thread(disconnectThread).start(); + + // Keep listening for new joins + ServerSocket server = new ServerSocket(port); + serverThread = new ServerThread(dte,this,server); + new Thread(serverThread).start(); + } catch (IOException e) { + e.printStackTrace(); + } + } + catch (IOException e) { e.printStackTrace(); } } + public void notFirst(){ + first = false; + } public void leaveGroup() { + try { - ObjectOutputStream disconnectStream = new ObjectOutputStream(preSocket.getOutputStream()); + ObjectOutputStream disconnectStream = null; + if (first) { + disconnectStream = new ObjectOutputStream(hack.getOutputStream()); + } else + disconnectStream = new ObjectOutputStream(preSocket.getOutputStream()); + disconnectStream.writeObject(new DisconnectEvent(sucSocket.getInetAddress())); preSocket.close(); sucSocket.close(); diff --git a/src/DisconnectThread.java b/src/DisconnectThread.java index a369ec4..063d99d 100644 --- a/src/DisconnectThread.java +++ b/src/DisconnectThread.java @@ -18,6 +18,11 @@ public DisconnectThread(DistributedTextEditor dte, ChordNameServiceImpl cns, Soc @Override public void run() { try { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } while(true) { ObjectInputStream disconnectStream = new ObjectInputStream(socket.getInputStream()); Object de; diff --git a/src/ServerThread.java b/src/ServerThread.java index abaa527..9228937 100644 --- a/src/ServerThread.java +++ b/src/ServerThread.java @@ -48,16 +48,17 @@ public void run() { System.out.println(TAG + " spawns ERP"); - //cns.setSucSocket(new Socket(joiningSocket.getInetAddress(), port)); - cns.setSucSocket(joiningSocket); - outStream = new ObjectOutputStream(joiningSocket.getOutputStream()); - outStream.writeObject(new ConnectEvent()); + cns.setSucSocket(new Socket(joiningSocket.getInetAddress(), port)); + //cns.setSucSocket(joiningSocket); + //outStream = new ObjectOutputStream(joiningSocket.getOutputStream()); + //outStream.writeObject(new ConnectEvent()); dte.newEventPlayer(joiningSocket, myKey); System.out.println(TAG + " spawns EP"); + ServerSocket hack = new ServerSocket(port+1); - DisconnectThread disconnectThread = new DisconnectThread(dte, cns, cns.getSucSocket()); + DisconnectThread disconnectThread = new DisconnectThread(dte, cns, hack.accept()); new Thread(disconnectThread).start(); joiningSocket = null; @@ -68,7 +69,9 @@ public void run() { Socket preSocket = cns.getPreSocket(); - //outStream = new ObjectOutputStream(preSocket.getOutputStream()); + cns.notFirst(); + + outStream = new ObjectOutputStream(preSocket.getOutputStream()); outStream.writeObject(new DisconnectEvent(preSocket.getInetAddress())); preSocket.close(); From 38e5f4b38b176ba321ce3bf29a812ac1b2082176 Mon Sep 17 00:00:00 2001 From: Kresten Date: Thu, 28 May 2015 11:19:06 +0200 Subject: [PATCH 17/18] =?UTF-8?q?Exception=20virkede=20ikke.=20Nu=20med=20?= =?UTF-8?q?tr=C3=A5d?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ChordNameServiceImpl.java | 54 ++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index 331797e..f6fdc38 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -1,10 +1,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; +import java.net.*; public class ChordNameServiceImpl { @@ -74,23 +71,25 @@ public void joinGroup(InetSocketAddress knownPeer) { // Wait for new predecessor ServerSocket server = new ServerSocket(port); - server.setSoTimeout(1000); + System.out.println("waiting"); + final ServerSocket finalServer = server; + new Thread(new Runnable(){ + @Override + public void run() { + try { + Thread.sleep(1000); + System.out.println("about to close"); + finalServer.close(); + System.out.println("closed"); + first = true; + } catch (InterruptedException | IOException e) { + e.printStackTrace(); + } + } + }).start(); preSocket = server.accept(); - - // Start listening for disconnects from successor - disconnectThread = new DisconnectThread(dte,this,sucSocket); - new Thread(disconnectThread).start(); - - dte.newEventReplayer(preSocket, myKey); - - // Keep listening for new joins - serverThread = new ServerThread(dte,this,server); - new Thread(serverThread).start(); - - } - catch (SocketException e1){ - try { - first = true; + System.out.println("Done waiting"); + if(first){ preSocket = sucSocket; dte.newEventReplayer(preSocket, myKey); hack = new Socket(preSocket.getInetAddress(), port+1); @@ -100,11 +99,20 @@ public void joinGroup(InetSocketAddress knownPeer) { new Thread(disconnectThread).start(); // Keep listening for new joins - ServerSocket server = new ServerSocket(port); + server = new ServerSocket(port); serverThread = new ServerThread(dte,this,server); new Thread(serverThread).start(); - } catch (IOException e) { - e.printStackTrace(); + } + else { + // Start listening for disconnects from successor + disconnectThread = new DisconnectThread(dte, this, sucSocket); + new Thread(disconnectThread).start(); + + dte.newEventReplayer(preSocket, myKey); + + // Keep listening for new joins + serverThread = new ServerThread(dte, this, server); + new Thread(serverThread).start(); } } catch (IOException e) { From ffd64a370c032c90c7621a9b604f689bed7d4588 Mon Sep 17 00:00:00 2001 From: Kresten Date: Thu, 28 May 2015 11:27:00 +0200 Subject: [PATCH 18/18] =?UTF-8?q?Nu=20med=20tr=C3=A5d=20OG=20exception!?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ChordNameServiceImpl.java | 38 ++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/ChordNameServiceImpl.java b/src/ChordNameServiceImpl.java index f6fdc38..bd5f924 100644 --- a/src/ChordNameServiceImpl.java +++ b/src/ChordNameServiceImpl.java @@ -81,7 +81,6 @@ public void run() { System.out.println("about to close"); finalServer.close(); System.out.println("closed"); - first = true; } catch (InterruptedException | IOException e) { e.printStackTrace(); } @@ -89,31 +88,38 @@ public void run() { }).start(); preSocket = server.accept(); System.out.println("Done waiting"); - if(first){ - preSocket = sucSocket; - dte.newEventReplayer(preSocket, myKey); - hack = new Socket(preSocket.getInetAddress(), port+1); + System.out.println("else"); + // Start listening for disconnects from successor + disconnectThread = new DisconnectThread(dte, this, sucSocket); + new Thread(disconnectThread).start(); + + dte.newEventReplayer(preSocket, myKey); + + // Keep listening for new joins + serverThread = new ServerThread(dte, this, server); + new Thread(serverThread).start(); + + } + catch (SocketException e1){ + first = true; + System.out.println("first"); + preSocket = sucSocket; + dte.newEventReplayer(preSocket, myKey); + try { + hack = new Socket(preSocket.getInetAddress(), port+1); // Start listening for disconnects from successor disconnectThread = new DisconnectThread(dte,this,hack); new Thread(disconnectThread).start(); // Keep listening for new joins - server = new ServerSocket(port); + ServerSocket server = new ServerSocket(port); serverThread = new ServerThread(dte,this,server); new Thread(serverThread).start(); + } catch (IOException e) { + e.printStackTrace(); } - else { - // Start listening for disconnects from successor - disconnectThread = new DisconnectThread(dte, this, sucSocket); - new Thread(disconnectThread).start(); - - dte.newEventReplayer(preSocket, myKey); - // Keep listening for new joins - serverThread = new ServerThread(dte, this, server); - new Thread(serverThread).start(); - } } catch (IOException e) { e.printStackTrace();