nodechan

decentralized peer-to-peer anonymous messageboard client
git clone git://squid-tech.com/nodechan.git
Log | Files | Refs | README

commit 9ba6909595f8bb7953f995d858795185692ca1e2
parent e246e877910b4e0bab661e257718bdcd53c9b510
Author: Josh <jxm5210@g.rit.edu>
Date:   Mon, 12 Aug 2019 01:32:22 -0400

Merge pull request #4 from joshiemoore/request-threads

Request-threads
Diffstat:
Mformat.txt | 17+++++++++++++++--
Mmakefile | 2+-
Msrc/ChanPost.java | 59++++++++++++++++++++++++++++++++++++++++++++++++++++-------
Msrc/ChanThread.java | 31+++++++++++++------------------
Msrc/GUIMain.java | 8++++----
Msrc/GUIThreadView.java | 22++++++++++++----------
Msrc/IncomingThread.java | 127++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Msrc/NodeChan.java | 70++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
Asrc/PacketQueuer.java | 42++++++++++++++++++++++++++++++++++++++++++
Asrc/RequestedThreadSender.java | 38++++++++++++++++++++++++++++++++++++++
10 files changed, 306 insertions(+), 110 deletions(-)

diff --git a/format.txt b/format.txt @@ -9,6 +9,8 @@ BYTE 1 - 'C' BYTE 2 - type -'P' --> post + -'H' --> hello-packet + -'R' --> request-packet BYTE 3 - flags @@ -21,9 +23,11 @@ BYTE 8-15 - thread ID BYTE 16-23 - post ID -BYTE 24-73 - title of the thread the post belongs to +BYTE 24-31 - post time -BYTE 74-329 - post text +BYTE 32-81 - title of the thread the post belongs to + +BYTE 82-337 - post text @@ -31,3 +35,12 @@ Layout of a hello-packet ('H'): Hello-packets contain no data beyond the header bytes. These packets are only used for a client to add itself to one of its peers' peer list, or for the client to keep itself alive in its peers' peer list. + + + +Layout of a request-packet ('R'): +BYTE 8-15 - TID + +When a client receives a request-packet, it will send its copy of the thread +specified by to the requesting client. This is typically used when a client first +connects to the network and is trying to gather threads. diff --git a/makefile b/makefile @@ -1,4 +1,4 @@ -NODE_DEPENDS = lib/WaifUPnP.jar src/ChanPost.java src/ChanThread.java src/GUIAddPeer.java src/GUICreateNewThread.java src/GUIMain.java src/GUIRightClickMenu.java src/GUIThreadView.java src/IncomingThread.java src/NodeChan.java src/OutgoingThread.java src/Peer.java +NODE_DEPENDS = lib/WaifUPnP.jar src/ChanPost.java src/ChanThread.java src/GUIAddPeer.java src/GUICreateNewThread.java src/GUIMain.java src/GUIRightClickMenu.java src/GUIThreadView.java src/IncomingThread.java src/NodeChan.java src/OutgoingThread.java src/Peer.java src/RequestedThreadSender.java src/PacketQueuer.java JAR_DEPENDS = NodeChan.jar manifest.mf lib/WaifUPnP.jar NodeChan.jar: $(NODE_DEPENDS) diff --git a/src/ChanPost.java b/src/ChanPost.java @@ -5,6 +5,8 @@ import java.util.Random; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; + /** * * This class represents a single post in a NodeChan thread. @@ -72,17 +74,38 @@ public class ChanPost { } /** + * For sorting purposes + * Sort from most recent (so, highest) post time to oldest (top-to-bottom) + */ + public int compareTo(ChanPost other) { + long t = this.postTime - other.getPostTime(); + + if (t < 0) return -1; + else if (t > 0) return 1; + else return 0; + } + + public void setTitle(String title) { + this.title = title; + } + + public void setPostTime(long time) { + this.postTime = time; + } + + /** * Convert a ChanPost into a byte array for transmission over UDP. See * format.txt for information about packet formatting. */ public static byte[] encodeUDP(ChanPost out) { - byte[] result = new byte[330]; + byte[] result = new byte[338]; String outTid = out.getTid(); String outPid = out.getPid(); byte[] out_addr = out.getSender_addr().getAddress(); String outTitle = out.getTitle(); String outText = out.getText(); + long outTime = out.getPostTime(); // NodeChan header result[0] = 'N'; @@ -109,21 +132,29 @@ public class ChanPost { result[i + 16] = (byte)outPid.charAt(i); } + // post time + ByteBuffer timeBuffer = ByteBuffer.allocate(Long.BYTES).putLong(outTime); + byte[] time = timeBuffer.array(); + + for (int i = 0; i < 8; i++) { + result[i + 24] = time[i]; + } + // title for (int i = 0; i < 50; i++) { if (i < outTitle.length()) { - result[24 + i] = (byte)outTitle.charAt(i); + result[32 + i] = (byte)outTitle.charAt(i); } else { - result[24 + i] = (byte)0; + result[32 + i] = (byte)0; } } // post text for (int i = 0; i < 256; i++) { if (i < outText.length()) { - result[74 + i] = (byte)outText.charAt(i); + result[82 + i] = (byte)outText.charAt(i); } else { - result[74 + i] = (byte)0; + result[82 + i] = (byte)0; } } @@ -153,10 +184,22 @@ public class ChanPost { String newTid = new String(in, 8, 8); String newPid = new String(in, 16, 8); - String newTitle = new String(in, 24, 50); + + long newTime = 0; + byte[] newTimeArr = new byte[8]; + for (int i = 0; i < 8; i++) { + newTimeArr[i] = in[i + 24]; + } + + ByteBuffer newBuffer = ByteBuffer.allocate(Long.BYTES); + newBuffer.put(newTimeArr); + newBuffer.flip(); + newTime = newBuffer.getLong(); + + String newTitle = new String(in, 32, 50); String newText = ""; - for (int i = 74; i < in.length; i++) { + for (int i = 82; i < in.length; i++) { newText = newText + (char)in[i]; } @@ -179,6 +222,8 @@ public class ChanPost { newText ); + result.setPostTime(newTime); + return result; } diff --git a/src/ChanThread.java b/src/ChanThread.java @@ -53,24 +53,15 @@ public class ChanThread { * Sorted by post time - earliest first. */ public void addPost(ChanPost post) { - if (posts.size() == 0 && post.getIsRoot()) { + if (post.getIsRoot()) { // this is the first post in the thread this.title = post.getTitle(); - this.posts.add(post); - } else { - // sort by time (earliest first) - for (int i = 0; i < posts.size() - 1; i++) { - if (posts.get(i).getPostTime() < post.getPostTime() && - posts.get(i + 1).getPostTime() >= post.getPostTime()) { - this.posts.add(i + 1, post); - return; - } - } - - this.posts.add(post); } - this.lastTime = post.getPostTime(); + this.posts.add(post); + + if (post.getPostTime() > this.lastTime) + this.lastTime = post.getPostTime(); } /** @@ -85,6 +76,10 @@ public class ChanThread { else return 0; } + public void setTitle(String title) { + this.title = title; + } + /* * getters @@ -105,14 +100,14 @@ public class ChanThread { return this.lastTime; } - /** - * We want to avoid giving outside classes direct access to the posts - * ArrayList - */ public ChanPost getPost(int i) { return this.posts.get(i); } + public ArrayList<ChanPost> getPosts() { + return this.posts; + } + /** * Delete a post (in case of blocking) */ diff --git a/src/GUIMain.java b/src/GUIMain.java @@ -24,7 +24,7 @@ import javax.swing.border.EmptyBorder; import javax.swing.border.LineBorder; import javax.swing.JPopupMenu; -import java.util.ArrayList; +import java.util.List; import java.util.Vector; /** @@ -34,9 +34,9 @@ import java.util.Vector; */ public class GUIMain extends JFrame { /** This user's list of threads **/ - private ArrayList<ChanThread> threads; + private List<ChanThread> threads; /** This user's list of peers **/ - private ArrayList<Peer> peers; + private List<Peer> peers; /** The list of threads that will be displayed to the user **/ private JList<ChanThread> threadList; @@ -64,7 +64,7 @@ public class GUIMain extends JFrame { JButton newThread; JLabel statusNumPeers; - public GUIMain(ArrayList<ChanThread> threads, ArrayList<Peer> peers) { + public GUIMain(List<ChanThread> threads, List<Peer> peers) { super("NodeChan"); this.threads = threads; this.peers = peers; diff --git a/src/GUIThreadView.java b/src/GUIThreadView.java @@ -2,6 +2,8 @@ package com.squidtech.nodechan; import java.util.ArrayList; import java.util.Vector; +import java.util.Collections; +import java.util.Comparator; import java.awt.BorderLayout; import java.awt.FlowLayout; @@ -58,12 +60,7 @@ public class GUIThreadView extends JFrame { this.setLayout(new BorderLayout()); this.setSize(540, 380); - threadPosts = new ArrayList<ChanPost>(); - - // initialize the posts - for (int i = 0; i < thread.getNumPosts(); i++) { - threadPosts.add(thread.getPost(i)); - } + threadPosts = thread.getPosts(); replyBar = new JPanel(new BorderLayout()); replyBar.setBorder(new CompoundBorder(new LineBorder(Color.DARK_GRAY), @@ -131,14 +128,19 @@ public class GUIThreadView extends JFrame { this.add(scrollPane); this.setVisible(true); + + refreshPosts(); } public void refreshPosts() { - threadPosts = new ArrayList<ChanPost>(); + threadPosts = thread.getPosts(); - for (int i = 0; i < thread.getNumPosts(); i++) { - threadPosts.add(thread.getPost(i)); - } + Collections.sort(threadPosts, new Comparator<ChanPost>() { + @Override + public int compare(ChanPost post1, ChanPost post2) { + return post1.compareTo(post2); + } + }); chanPostJList.setListData(new Vector<ChanPost>(threadPosts)); } diff --git a/src/IncomingThread.java b/src/IncomingThread.java @@ -5,7 +5,7 @@ import java.net.DatagramPacket; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.ArrayList; +import java.util.List; import java.io.IOException; @@ -17,33 +17,28 @@ import java.util.Comparator; * incoming data as necessary. */ public class IncomingThread extends Thread { - /** The socket we're receiving UDP through **/ - DatagramSocket sock; + /** The packet queue to pull packets from **/ + List<byte[]> queue; /** The local ChanThread storage **/ - ArrayList<ChanThread> threads; + List<ChanThread> threads; /** The local list of peers **/ - ArrayList<Peer> peers; + List<Peer> peers; - public IncomingThread(DatagramSocket sock, ArrayList<ChanThread> threads, ArrayList<Peer> peers) { - this.sock = sock; + public IncomingThread(List<byte[]> queue, List<ChanThread> threads, List<Peer> peers) { + this.queue = queue; this.threads = threads; this.peers = peers; } public void run() { - // handle incoming packets indefinitely + // process incoming packets indefinitely while (true) { - byte[] recv_data = new byte[326]; + // wait for a packet to come + while(queue.size() == 0); - DatagramPacket receivePacket = new DatagramPacket(recv_data, recv_data.length); - - try { - sock.receive(receivePacket); - } catch (IOException e) { - continue; - } + byte[] recv_data = queue.remove(0); // check header if (recv_data[0] != 'N' || recv_data[1] != 'C') continue; @@ -88,56 +83,46 @@ public class IncomingThread extends Thread { // decode the post packet ChanPost post = ChanPost.decodeUDP(recv_data); - if (post.getIsRoot()) { - // check whether we already have a copy of this OP - boolean haveOP = false; - - for (ChanThread t : threads) { - if (t.getTid().equals(post.getTid())) { - haveOP = true; - post = t.getPost(0); - break; - } - } - if (!haveOP) { - // create a new local thread with this OP - ChanThread newThread = new ChanThread(post.getTid()); - newThread.addPost(post); - threads.add(newThread); - } - } else { - // check whether we have this thread - // if not, ignore this post, since it would be pointless to start - // in the middle of the conversation - ChanThread existThread = null; - - for (ChanThread t : threads) { - if (t.getTid().equals(post.getTid())) { - existThread = t; - break; - } + // check whether we have this thread + // if not, ignore this post, since it would be pointless to start + // in the middle of the conversation + ChanThread existThread = null; + + for (ChanThread t : threads) { + if (t.getTid().equals(post.getTid())) { + existThread = t; + break; } + } - if (existThread != null) { - // check whether we already have this post - boolean havePost = false; + if (existThread != null) { + // check whether we already have this post + boolean havePost = false; - for (int i = 0; i < existThread.getNumPosts(); i++) { - if (existThread.getPost(i).getPid().equals(post.getPid())) { - havePost = true; - post = existThread.getPost(i); - break; - } + for (int i = 0; i < existThread.getNumPosts(); i++) { + if (existThread.getPost(i).getPid().equals(post.getPid())) { + havePost = true; + post = existThread.getPost(i); + break; } + } - if (!havePost) { - // we have the thread, but don't have this post yet, so add it - existThread.addPost(post); - } - } else { - // ignore the post... for now + if (!havePost) { + // we have the thread, but don't have this post yet, so add it + existThread.addPost(post); } + } else { + // we don't have this thread yet, so we will create a new local + // copy, and also ask the sending peer for the rest of the thread + ChanThread tempThread = new ChanThread(post.getTid()); + tempThread.addPost(post); + tempThread.setTitle(post.getTitle()); + threads.add(tempThread); + + // request the complete thread from the client that just + // sent us this post + NodeChan.requestThread(tempThread.getTid(), incoming); } // forward this packet to all peers (except the peer we received the @@ -167,6 +152,28 @@ public class IncomingThread extends Thread { case 'H': // do nothing, the hello-packet is just for adding new peers break; + case 'R': + // send a copy of the specified thread to the user we received + // the thread-request from + String tid = ""; + ChanThread reqThread = null; + + for (int i = 0; i < 8; i++) { + tid += ((char) recv_data[i + 8]); + } + + // find the requested thread in this client's thread list + for (int i = 0; i < threads.size(); i++) { + if (threads.get(i).getTid().equals(tid)) { + reqThread = threads.get(i); + break; + } + } + + if (reqThread == null) continue; + + new RequestedThreadSender(reqThread, incoming).start(); + break; } // check for peers that have timed out @@ -174,7 +181,7 @@ public class IncomingThread extends Thread { // update the GUI when we receive packets, if GUI mode and auto-refresh // are both enabled - if (!NodeChan.nogui && NodeChan.autorefresh) { + if (!NodeChan.nogui && NodeChan.autorefresh && NodeChan.mainGui != null) { NodeChan.mainGui.refreshThreads(); } } diff --git a/src/NodeChan.java b/src/NodeChan.java @@ -2,9 +2,11 @@ package com.squidtech.nodechan; import java.util.Scanner; import java.util.ArrayList; +import java.util.List; import java.util.InputMismatchException; import java.util.Collections; import java.util.Comparator; +import java.util.concurrent.CopyOnWriteArrayList; import java.io.BufferedReader; import java.io.InputStreamReader; @@ -85,14 +87,20 @@ public class NodeChan { /** Incoming packet-handling thread **/ private static IncomingThread nc_incoming; + /** Incoming packet-queuing thread **/ + private static PacketQueuer nc_queuer; + + /** stores incoming packets for the packet handling thread to process **/ + private static List<byte[]> nc_packet_queue; + /** List of this node's peers **/ - private static ArrayList<Peer> peers; + private static List<Peer> peers; /** Local list of ChanThreads this user has received **/ - private static ArrayList<ChanThread> threads; + private static List<ChanThread> threads; /** List of users that this user has blocked **/ - private static ArrayList<Peer> blocked; + private static List<Peer> blocked; /** URL of the peer tracker to use **/ public static String peerTrackerURL = "http://squid-tech.com/nodes/peer.php?ip="; @@ -111,9 +119,14 @@ public class NodeChan { System.out.println("Welcome to NodeChan."); - peers = new ArrayList<Peer>(); - threads = new ArrayList<ChanThread>(); - blocked = new ArrayList<Peer>(); + //peers = new ArrayList<Peer>(); + //threads = new ArrayList<ChanThread>(); + //blocked = new ArrayList<Peer>(); + + peers = new CopyOnWriteArrayList<Peer>(); + threads = new CopyOnWriteArrayList<ChanThread>(); + blocked = new CopyOnWriteArrayList<Peer>(); + nc_packet_queue = new CopyOnWriteArrayList<byte[]>(); // get the local ip address if (!local) { @@ -177,9 +190,13 @@ public class NodeChan { return; } + // initialize packet receiving/queuing thread + nc_queuer = new PacketQueuer(nc_packet_queue, nc_socket); // initialize incoming packet-handling thread - nc_incoming = new IncomingThread(nc_socket, threads, peers); + nc_incoming = new IncomingThread(nc_packet_queue, threads, peers); + + nc_queuer.start(); nc_incoming.start(); // command-line inputs @@ -635,7 +652,6 @@ public class NodeChan { hello[2] = 'H'; // flags - // TODO: request threads from peer hello[3] = 0; // this node's IP @@ -749,4 +765,42 @@ public class NodeChan { return false; } + + /** + * This method sends a request for another client to send a complete copy + * of the thread specified by "tid" + * This method is used when a client receives a post that is not the + * actual OP of the thread, so they need the rest of the thread for the + * out-of-order received post to make sense + * recip is the InetAddress of the client we're requesting the rest of + * the thread from + */ + public static void requestThread(String tid, InetAddress recip) { + byte[] request = new byte[16]; + + // header bytes + request[0] = 'N'; + request[1] = 'C'; + + // post type + request[2] = 'R'; + + // flags + request[3] = 0; + + // this node's IP + byte[] node_addr_bytes = node_ip.getAddress(); + + for (int i = 0; i < 4; i++) { + request[i + 4] = node_addr_bytes[i]; + } + + // the TID of the thread this client is requesting + for (int i = 0; i < 8; i++) { + request[i + 8] = (byte)tid.charAt(i); + } + + // send the request-packet to the peer + new OutgoingThread(recip, NC_PORT, request).start(); + } } diff --git a/src/PacketQueuer.java b/src/PacketQueuer.java @@ -0,0 +1,42 @@ +package com.squidtech.nodechan; + +import java.util.List; + +import java.net.DatagramSocket; +import java.net.DatagramPacket; + +import java.io.IOException; + +/** + * This class acts as a queue for incoming packets, so the IncomingThread class can process + * a packet without interfering with the receipt of any further packets + */ +public class PacketQueuer extends Thread { + /** The queue to add incoming packets to **/ + List<byte[]> queue; + + /** The socket to listen on **/ + DatagramSocket socket; + + public PacketQueuer(List<byte[]> queue, DatagramSocket socket) { + this.queue = queue; + this.socket = socket; + } + + public void run() { + while(true) { + byte[] recv_data = new byte[338]; + + DatagramPacket receivePacket = new DatagramPacket(recv_data, recv_data.length); + + try { + socket.receive(receivePacket); + } catch (IOException e) { + continue; + } + + // add the packet to the queue + queue.add(recv_data); + } + } +} diff --git a/src/RequestedThreadSender.java b/src/RequestedThreadSender.java @@ -0,0 +1,38 @@ +package com.squidtech.nodechan; + +import java.net.InetAddress; + +/** + * This thread is invoked to separate sending entire groups of posts + * from the main processing threads + */ +public class RequestedThreadSender extends Thread { + /** The thread containing the posts we are sending **/ + ChanThread thread; + + /** The recipient of the thread **/ + InetAddress recip; + + public RequestedThreadSender(ChanThread thread, InetAddress recip) { + this.thread = thread; + this.recip = recip; + } + + public void run() { + // Send all posts in the thread to the recipient + for (int i = 0; i < thread.getNumPosts(); i++) { + byte[] out = ChanPost.encodeUDP(thread.getPost(i)); + + OutgoingThread outThread = new OutgoingThread(recip, NodeChan.NC_PORT, out); + + outThread.start(); + + try { + outThread.join(); + } catch (InterruptedException e) { + System.out.println("RequestedThreadSender interrupted"); + break; + } + } + } +}