From 3567b88c0dfc1662eb48775e9dfd8838aab0df92 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Mon, 9 Feb 2015 21:33:27 +0100 Subject: [PATCH] Added flood-based routing. --- PROTOCOL.md | 7 +- .../ensichat/protocol/RouterTest.scala | 81 +++++++++++++++++++ .../nutomic/ensichat/protocol/UserTest.scala | 4 +- .../protocol/messages/MessageHeaderTest.scala | 10 +-- app/src/main/res/values/strings.xml | 3 + app/src/main/res/xml/settings.xml | 6 ++ .../activities/AddContactsActivity.scala | 5 +- .../bluetooth/BluetoothInterface.scala | 6 +- .../ensichat/bluetooth/TransferThread.scala | 7 +- .../ensichat/fragments/SettingsFragment.scala | 12 ++- .../ensichat/protocol/ChatService.scala | 67 ++++++++------- .../nutomic/ensichat/protocol/Router.scala | 47 +++++++++++ .../ensichat/protocol/SeqNumGenerator.scala | 24 ++++++ .../protocol/messages/MessageHeader.scala | 10 ++- 14 files changed, 240 insertions(+), 49 deletions(-) create mode 100644 app/src/androidTest/scala/com/nutomic/ensichat/protocol/RouterTest.scala create mode 100644 app/src/main/scala/com/nutomic/ensichat/protocol/Router.scala create mode 100644 app/src/main/scala/com/nutomic/ensichat/protocol/SeqNumGenerator.scala diff --git a/PROTOCOL.md b/PROTOCOL.md index 78c7436..cc3e195 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -71,7 +71,7 @@ header is in network byte order, i.e. big endian. | Target Address | | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Reserved | + | Sequence Number | Reserved | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Version specifies the protocol version number. This is currently 0. A @@ -100,6 +100,11 @@ message. Target Address is the address of the node that should receive the message. +Sequence number is set by the sender, and MUST increment by 1 for +each new message sent (after 2^16-1 comes 0 again). It SHOULD +be persistent during restarts. It is used by intermediate nodes +to avoid forwarding the same message multiple times. + ### Encryption Data diff --git a/app/src/androidTest/scala/com/nutomic/ensichat/protocol/RouterTest.scala b/app/src/androidTest/scala/com/nutomic/ensichat/protocol/RouterTest.scala new file mode 100644 index 0000000..d23824a --- /dev/null +++ b/app/src/androidTest/scala/com/nutomic/ensichat/protocol/RouterTest.scala @@ -0,0 +1,81 @@ +package com.nutomic.ensichat.protocol + +import android.test.AndroidTestCase +import android.util.Log +import com.nutomic.ensichat.protocol.messages._ +import junit.framework.Assert._ + +class RouterTest extends AndroidTestCase { + + private def neighbors() = Set[Address](AddressTest.a1, AddressTest.a2, AddressTest.a3) + + private val msg = generateMessage(AddressTest.a1, AddressTest.a4, 1) + + /** + * Messages should be sent to all neighbors. + */ + def testFlooding(): Unit = { + var sentTo = Set[Address]() + val router: Router = new Router(neighbors, + (a, m) => { + assertEquals(msg, m) + sentTo += a + }) + + router.onReceive(msg) + assertEquals(neighbors(), sentTo) + } + + /** + * Messages from different senders with the same sequence number should be forwarded. + */ + def testDifferentSenders(): Unit = { + var sentTo = Set[Address]() + val router: Router = new Router(neighbors, (a, m) => sentTo += a) + + router.onReceive(msg) + assertEquals(neighbors(), sentTo) + + sentTo = Set[Address]() + router.onReceive(generateMessage(AddressTest.a2, AddressTest.a4, 1)) + assertEquals(neighbors(), sentTo) + } + + /** + * Messages from the same sender with the same sequence number should be ignored. + */ + def testIgnores(): Unit = { + var sentTo = Set[Address]() + val router: Router = new Router(neighbors, (a, m) => sentTo += a) + + router.onReceive(msg) + assertEquals(neighbors(), sentTo) + + sentTo = Set[Address]() + router.onReceive(generateMessage(AddressTest.a1, AddressTest.a2, 1)) + assertTrue(sentTo.isEmpty) + } + + def testDiscardOldIgnores(): Unit = { + def test(first: Int, second: Int) { + var sentTo = Set[Address]() + val router: Router = new Router(neighbors, (a, m) => sentTo += a) + router.onReceive(generateMessage(AddressTest.a1, AddressTest.a3, first)) + router.onReceive(generateMessage(AddressTest.a1, AddressTest.a3, second)) + + sentTo = Set[Address]() + router.onReceive(generateMessage(AddressTest.a1, AddressTest.a3, first)) + assertEquals(neighbors(), sentTo) + } + + test(1, MessageHeader.SeqNumRange.last) + test(MessageHeader.SeqNumRange.last / 2, MessageHeader.SeqNumRange.last) + test(MessageHeader.SeqNumRange.last / 2, 1) + } + + private def generateMessage(sender: Address, receiver: Address, seqNum: Int): Message = { + val header = new MessageHeader(0, MessageHeader.DefaultHopLimit, sender, receiver, seqNum) + new Message(header, new CryptoData(None, None), new UserName("")) + } + +} \ No newline at end of file diff --git a/app/src/androidTest/scala/com/nutomic/ensichat/protocol/UserTest.scala b/app/src/androidTest/scala/com/nutomic/ensichat/protocol/UserTest.scala index 2c4a2eb..1a5c30e 100644 --- a/app/src/androidTest/scala/com/nutomic/ensichat/protocol/UserTest.scala +++ b/app/src/androidTest/scala/com/nutomic/ensichat/protocol/UserTest.scala @@ -3,7 +3,9 @@ package com.nutomic.ensichat.protocol object UserTest { val u1 = new User(AddressTest.a1, "one") - + val u2 = new User(AddressTest.a2, "two") + + val u3 = new User(AddressTest.a3, "three") } \ No newline at end of file diff --git a/app/src/androidTest/scala/com/nutomic/ensichat/protocol/messages/MessageHeaderTest.scala b/app/src/androidTest/scala/com/nutomic/ensichat/protocol/messages/MessageHeaderTest.scala index a2adadd..4eb8dac 100644 --- a/app/src/androidTest/scala/com/nutomic/ensichat/protocol/messages/MessageHeaderTest.scala +++ b/app/src/androidTest/scala/com/nutomic/ensichat/protocol/messages/MessageHeaderTest.scala @@ -8,15 +8,15 @@ import junit.framework.Assert._ object MessageHeaderTest { val h1 = new MessageHeader(Text.Type, MessageHeader.DefaultHopLimit, AddressTest.a1, - AddressTest.a2, 1234, 0) + AddressTest.a2, 5, 1234, 0) - val h2 = new MessageHeader(Text.Type, 0, AddressTest.a1, AddressTest.a3, 8765, 234) + val h2 = new MessageHeader(Text.Type, 0, AddressTest.a1, AddressTest.a3, 30000, 8765, 234) - val h3 = new MessageHeader(Text.Type, 0xff, AddressTest.a4, AddressTest.a2, 0, 56) + val h3 = new MessageHeader(Text.Type, 0xff, AddressTest.a4, AddressTest.a2, 250, 0, 56) - val h4 = new MessageHeader(0xfff, 0, Address.Null, Address.Broadcast, 0, 0xff) + val h4 = new MessageHeader(0xfff, 0, Address.Null, Address.Broadcast, MessageHeader.SeqNumRange.last, 0, 0xff) - val h5 = new MessageHeader(ConnectionInfo.Type, 0xff, Address.Broadcast, Address.Null, 0xffff, 0) + val h5 = new MessageHeader(ConnectionInfo.Type, 0xff, Address.Broadcast, Address.Null, 0, 0xffff, 0) val headers = Set(h1, h2, h3, h4, h5) diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index 03f5562..f79a2f7 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -62,6 +62,9 @@ Scan Interval (seconds) + + + Maximum Number of Connections diff --git a/app/src/main/res/xml/settings.xml b/app/src/main/res/xml/settings.xml index 5d81321..8ead831 100644 --- a/app/src/main/res/xml/settings.xml +++ b/app/src/main/res/xml/settings.xml @@ -12,5 +12,11 @@ android:defaultValue="15" android:inputType="number" android:numeric="integer" /> + + \ No newline at end of file diff --git a/app/src/main/scala/com/nutomic/ensichat/activities/AddContactsActivity.scala b/app/src/main/scala/com/nutomic/ensichat/activities/AddContactsActivity.scala index 0be772b..1e4dd82 100644 --- a/app/src/main/scala/com/nutomic/ensichat/activities/AddContactsActivity.scala +++ b/app/src/main/scala/com/nutomic/ensichat/activities/AddContactsActivity.scala @@ -7,7 +7,7 @@ import android.view._ import android.widget.AdapterView.OnItemClickListener import android.widget._ import com.nutomic.ensichat.R -import com.nutomic.ensichat.protocol.ChatService +import com.nutomic.ensichat.protocol.{User, ChatService} import com.nutomic.ensichat.protocol.messages.RequestAddContact import com.nutomic.ensichat.util.Database.OnContactsUpdatedListener import com.nutomic.ensichat.util.UsersAdapter @@ -69,7 +69,8 @@ class AddContactsActivity extends EnsiChatActivity with ChatService.OnConnection runOnUiThread(new Runnable { override def run(): Unit = { adapter.clear() - (service.getConnections -- service.database.getContacts).foreach(adapter.add) + (service.connections().map(a => service.getUser(a)) -- service.database.getContacts) + .foreach(adapter.add) } }) } diff --git a/app/src/main/scala/com/nutomic/ensichat/bluetooth/BluetoothInterface.scala b/app/src/main/scala/com/nutomic/ensichat/bluetooth/BluetoothInterface.scala index b7c274a..a91d4fc 100644 --- a/app/src/main/scala/com/nutomic/ensichat/bluetooth/BluetoothInterface.scala +++ b/app/src/main/scala/com/nutomic/ensichat/bluetooth/BluetoothInterface.scala @@ -180,10 +180,10 @@ class BluetoothInterface(Service: ChatService, Crypto: Crypto) extends Interface } /** - * Sends the message to the target address specified in the message header. + * Sends the message to nextHop. */ - override def send(msg: Message): Unit = - connections.apply(addressDeviceMap.get(msg.Header.Target)).send(msg) + override def send(nextHop: Address, msg: Message): Unit = + connections.get(addressDeviceMap.get(nextHop)).foreach(_.send(msg)) /** * Returns all active Bluetooth connections. diff --git a/app/src/main/scala/com/nutomic/ensichat/bluetooth/TransferThread.scala b/app/src/main/scala/com/nutomic/ensichat/bluetooth/TransferThread.scala index f875986..fab9048 100644 --- a/app/src/main/scala/com/nutomic/ensichat/bluetooth/TransferThread.scala +++ b/app/src/main/scala/com/nutomic/ensichat/bluetooth/TransferThread.scala @@ -53,13 +53,11 @@ class TransferThread(device: Device, socket: BluetoothSocket, Handler: Bluetooth val msg = Message.read(inStream) onReceive(msg, device.Id) + Log.v(Tag, "Receiving " + msg) } } catch { - case e: ReadMessageException => - Log.i(Tag, "Failed to read message", e) - case e: IOException => + case e @ (_: ReadMessageException | _: IOException) => Log.w(Tag, "Failed to read incoming message", e) - close() return } } @@ -69,6 +67,7 @@ class TransferThread(device: Device, socket: BluetoothSocket, Handler: Bluetooth def send(msg: Message): Unit = { try { outStream.write(msg.write) + Log.v(Tag, "Sending " + msg) } catch { case e: IOException => Log.e(Tag, "Failed to write message", e) } diff --git a/app/src/main/scala/com/nutomic/ensichat/fragments/SettingsFragment.scala b/app/src/main/scala/com/nutomic/ensichat/fragments/SettingsFragment.scala index 22d45d3..91f87f4 100644 --- a/app/src/main/scala/com/nutomic/ensichat/fragments/SettingsFragment.scala +++ b/app/src/main/scala/com/nutomic/ensichat/fragments/SettingsFragment.scala @@ -13,7 +13,9 @@ object SettingsFragment { val KeyUserName = "user_name" val KeyScanInterval = "scan_interval_seconds" - + + val MaxConnections = "max_connections" + } /** @@ -25,14 +27,18 @@ class SettingsFragment extends PreferenceFragment with OnPreferenceChangeListene super.onCreate(savedInstanceState) addPreferencesFromResource(R.xml.settings) - val name = findPreference(KeyUserName) - val scanInterval = findPreference(KeyScanInterval) + val name = findPreference(KeyUserName) + val scanInterval = findPreference(KeyScanInterval) + val maxConnections = findPreference(MaxConnections) name.setOnPreferenceChangeListener(this) scanInterval.setOnPreferenceChangeListener(this) + maxConnections.setOnPreferenceChangeListener(this) val pm = PreferenceManager.getDefaultSharedPreferences(getActivity) name.setSummary(pm.getString(KeyUserName, "")) scanInterval.setSummary(pm.getString(KeyScanInterval, "15")) + maxConnections.setDefaultValue(Int.MaxValue) + maxConnections.setSummary(pm.getString(MaxConnections, Int.MaxValue.toString)) } /** diff --git a/app/src/main/scala/com/nutomic/ensichat/protocol/ChatService.scala b/app/src/main/scala/com/nutomic/ensichat/protocol/ChatService.scala index 0614964..ce70650 100644 --- a/app/src/main/scala/com/nutomic/ensichat/protocol/ChatService.scala +++ b/app/src/main/scala/com/nutomic/ensichat/protocol/ChatService.scala @@ -26,7 +26,7 @@ object ChatService { def destroy(): Unit - def send(msg: Message): Unit + def send(nextHop: Address, msg: Message): Unit } @@ -59,9 +59,13 @@ class ChatService extends Service { private lazy val crypto = new Crypto(this) - private lazy val bluetoothInterface = new BluetoothInterface(this, crypto) + private lazy val btInterface = new BluetoothInterface(this, crypto) - private val notificationIdGenerator = Stream.from(100) + private lazy val router = new Router(connections, sendVia) + + private lazy val seqNumGenerator = new SeqNumGenerator(this) + + private val notificationIdGenerator = Stream.from(100).iterator /** * For this (and [[messageListeners]], functions would be useful instead of instances, @@ -77,7 +81,7 @@ class ChatService extends Service { * * This is for user names that were received during runtime, and is not persistent. */ - private var connections = Set[User]() + private var knownUsers = Set[User]() /** * Generates keys and starts Bluetooth interface. @@ -95,13 +99,13 @@ class ChatService extends Service { Future { crypto.generateLocalKeys() - bluetoothInterface.create() + btInterface.create() Log.i(Tag, "Service started, address is " + Crypto.getLocalAddress(this)) } } override def onDestroy(): Unit = { - bluetoothInterface.destroy() + btInterface.destroy() } override def onStartCommand(intent: Intent, flags: Int, startId: Int) = Service.START_STICKY @@ -127,28 +131,35 @@ class ChatService extends Service { * Sends a new message to the given target address. */ def sendTo(target: Address, body: MessageBody): Unit = { - if (!bluetoothInterface.getConnections.contains(target)) + if (!btInterface.getConnections.contains(target)) return val header = new MessageHeader(body.messageType, MessageHeader.DefaultHopLimit, - Crypto.getLocalAddress(this), target, 0, 0) + Crypto.getLocalAddress(this), target, seqNumGenerator.next()) val msg = new Message(header, body) val encrypted = crypto.encrypt(crypto.sign(msg)) - bluetoothInterface.send(encrypted) + router.onReceive(encrypted) onNewMessage(msg) } + private def sendVia(nextHop: Address, msg: Message) = + btInterface.send(nextHop, msg) + /** * Decrypts and verifies incoming messages, forwards valid ones to [[onNewMessage()]]. */ def onMessageReceived(msg: Message): Unit = { - val decrypted = crypto.decrypt(msg) - if (!crypto.verify(decrypted)) { - Log.i(Tag, "Ignoring message with invalid signature from " + msg.Header.Origin) - return + if (msg.Header.Target == Crypto.getLocalAddress(this)) { + val decrypted = crypto.decrypt(msg) + if (!crypto.verify(decrypted)) { + Log.i(Tag, "Ignoring message with invalid signature from " + msg.Header.Origin) + return + } + onNewMessage(decrypted) + } else { + router.onReceive(msg) } - onNewMessage(decrypted) } /** @@ -157,7 +168,7 @@ class ChatService extends Service { private def onNewMessage(msg: Message): Unit = msg.Body match { case name: UserName => val contact = new User(msg.Header.Origin, name.Name) - connections += contact + knownUsers += contact if (database.getContact(msg.Header.Origin).nonEmpty) database.changeContactName(contact) @@ -180,7 +191,7 @@ class ChatService extends Service { .setAutoCancel(true) .build() val nm = getSystemService(Context.NOTIFICATION_SERVICE).asInstanceOf[NotificationManager] - nm.notify(notificationIdGenerator.iterator.next(), notification) + nm.notify(notificationIdGenerator.next(), notification) case _ => MainHandler.post(new Runnable { override def run(): Unit = @@ -202,6 +213,15 @@ class ChatService extends Service { * @return True if the connection is valid */ def onConnectionOpened(msg: Message): Boolean = { + val maxConnections = PreferenceManager + .getDefaultSharedPreferences(this) + .getString(SettingsFragment.MaxConnections, Int.MaxValue.toString) + .toInt + if (connections().size == maxConnections) { + Log.i(Tag, "Maximum number of connections reached") + false + } + val info = msg.Body.asInstanceOf[ConnectionInfo] val sender = crypto.calculateAddress(info.key) if (sender == Address.Broadcast || sender == Address.Null) { @@ -237,19 +257,10 @@ class ChatService extends Service { .foreach(_.apply().onConnectionsChanged()) } - /** - * Returns all direct neighbors. - */ - def getConnections: Set[User] = { - bluetoothInterface.getConnections.map{ address => - (database.getContacts ++ connections).find(_.Address == address) match { - case Some(contact) => contact - case None => new User(address, address.toString) - } - } - } + def connections() = + btInterface.getConnections def getUser(address: Address) = - getConnections.find(_.Address == address).getOrElse(new User(address, address.toString)) + knownUsers.find(_.Address == address).getOrElse(new User(address, address.toString)) } diff --git a/app/src/main/scala/com/nutomic/ensichat/protocol/Router.scala b/app/src/main/scala/com/nutomic/ensichat/protocol/Router.scala new file mode 100644 index 0000000..faf0633 --- /dev/null +++ b/app/src/main/scala/com/nutomic/ensichat/protocol/Router.scala @@ -0,0 +1,47 @@ +package com.nutomic.ensichat.protocol + +import com.nutomic.ensichat.protocol.messages.{Message, MessageHeader} + +/** + * Forwards messages to all connected devices. + */ +class Router(activeConnections: () => Set[Address], send: (Address, Message) => Unit) { + + private var messageSeen = Set[(Address, Int)]() + + def onReceive(msg: Message): Unit = { + val info = (msg.Header.Origin, msg.Header.SeqNum) + if (messageSeen.contains(info)) + return + + activeConnections().foreach(a => send(a, msg)) + + trimMessageSeen(info._1, info._2) + messageSeen += info + } + + /** + * Removes old entries from [[messageSeen]]. + * + * Only the last half of possible sequence number values are kept. For example, if sequence + * numbers are between 0 and 10, and a new message with sequence number 6 arrives, all entries + * for messages with sequence numbers outside [2, 6] are removed. + */ + private def trimMessageSeen(a1: Address, s1: Int): Unit = { + messageSeen = messageSeen.filter { case (a2, s2) => + if (a1 != a2) + true + + // True if [[s2]] is between {{{MessageHeader.SeqNumRange.size / 2}}} and + // [[MessageHeader.SeqNumRange.size]]. + if (s1 > MessageHeader.SeqNumRange.size / 2) { + // True if [[s2]] is between {{{s1 - MessageHeader.SeqNumRange.size / 2}}} and [[s1]]. + s1 - MessageHeader.SeqNumRange.size / 2 < s2 && s2 < s1 + } else { + // True if [[s2]] is *not* between [[s1]] and {{{s1 + MessageHeader.SeqNumRange.size / 2}}}. + s2 < s1 || s2 > s1 + MessageHeader.SeqNumRange.size / 2 + } + } + } + +} \ No newline at end of file diff --git a/app/src/main/scala/com/nutomic/ensichat/protocol/SeqNumGenerator.scala b/app/src/main/scala/com/nutomic/ensichat/protocol/SeqNumGenerator.scala new file mode 100644 index 0000000..f7396ed --- /dev/null +++ b/app/src/main/scala/com/nutomic/ensichat/protocol/SeqNumGenerator.scala @@ -0,0 +1,24 @@ +package com.nutomic.ensichat.protocol + +import android.content.Context +import android.preference.PreferenceManager +import com.nutomic.ensichat.protocol.messages.MessageHeader + +/** + * Generates sequence numbers acorrding to protocol, which are stored persistently. + */ +class SeqNumGenerator(context: Context) { + + private val KeySequenceNumber = "sequence_number" + + private val pm = PreferenceManager.getDefaultSharedPreferences(context) + + private var current = pm.getInt(KeySequenceNumber, MessageHeader.SeqNumRange.head) + + def next(): Int = { + current += 1 + pm.edit().putInt(KeySequenceNumber, current) + current + } + +} \ No newline at end of file diff --git a/app/src/main/scala/com/nutomic/ensichat/protocol/messages/MessageHeader.scala b/app/src/main/scala/com/nutomic/ensichat/protocol/messages/MessageHeader.scala index 4596dac..c706b73 100644 --- a/app/src/main/scala/com/nutomic/ensichat/protocol/messages/MessageHeader.scala +++ b/app/src/main/scala/com/nutomic/ensichat/protocol/messages/MessageHeader.scala @@ -11,6 +11,8 @@ object MessageHeader { val DefaultHopLimit = 20 val Version = 0 + + val SeqNumRange = 0 until ((2 << 16) - 1) class ParseMessageException(detailMessage: String) extends RuntimeException(detailMessage) { } @@ -33,7 +35,9 @@ object MessageHeader { val origin = new Address(BufferUtils.getByteArray(b, Address.Length)) val target = new Address(BufferUtils.getByteArray(b, Address.Length)) - new MessageHeader(messageType, hopLimit, origin, target, length, hopCount) + val seqNum = BufferUtils.getUnsignedShort(b) + + new MessageHeader(messageType, hopLimit, origin, target, seqNum, length, hopCount) } } @@ -45,6 +49,7 @@ case class MessageHeader(MessageType: Int, HopLimit: Int, Origin: Address, Target: Address, + SeqNum: Int, Length: Long = -1, HopCount: Int = 0) { @@ -63,7 +68,8 @@ case class MessageHeader(MessageType: Int, b.put(Origin.Bytes) b.put(Target.Bytes) - BufferUtils.putUnsignedInt(b, 0) + BufferUtils.putUnsignedShort(b, SeqNum) + BufferUtils.putUnsignedShort(b, 0) b.array() }