Added flood-based routing.

This commit is contained in:
Felix Ableitner 2015-02-09 21:33:27 +01:00
parent 61f21c0be1
commit 3567b88c0d
14 changed files with 240 additions and 49 deletions

View file

@ -71,7 +71,7 @@ header is in network byte order, i.e. big endian.
| Target Address | | Target Address |
| | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Reserved | | Sequence Number | Reserved |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Version specifies the protocol version number. This is currently 0. A Version specifies the protocol version number. This is currently 0. A
@ -100,6 +100,11 @@ message.
Target Address is the address of the node that should receive the Target Address is the address of the node that should receive the
message. message.
Sequence number is set by the sender, and MUST increment by 1 for
each new message sent (after 2^16-1 comes 0 again). It SHOULD
be persistent during restarts. It is used by intermediate nodes
to avoid forwarding the same message multiple times.
### Encryption Data ### Encryption Data

View file

@ -0,0 +1,81 @@
package com.nutomic.ensichat.protocol
import android.test.AndroidTestCase
import android.util.Log
import com.nutomic.ensichat.protocol.messages._
import junit.framework.Assert._
class RouterTest extends AndroidTestCase {
private def neighbors() = Set[Address](AddressTest.a1, AddressTest.a2, AddressTest.a3)
private val msg = generateMessage(AddressTest.a1, AddressTest.a4, 1)
/**
* Messages should be sent to all neighbors.
*/
def testFlooding(): Unit = {
var sentTo = Set[Address]()
val router: Router = new Router(neighbors,
(a, m) => {
assertEquals(msg, m)
sentTo += a
})
router.onReceive(msg)
assertEquals(neighbors(), sentTo)
}
/**
* Messages from different senders with the same sequence number should be forwarded.
*/
def testDifferentSenders(): Unit = {
var sentTo = Set[Address]()
val router: Router = new Router(neighbors, (a, m) => sentTo += a)
router.onReceive(msg)
assertEquals(neighbors(), sentTo)
sentTo = Set[Address]()
router.onReceive(generateMessage(AddressTest.a2, AddressTest.a4, 1))
assertEquals(neighbors(), sentTo)
}
/**
* Messages from the same sender with the same sequence number should be ignored.
*/
def testIgnores(): Unit = {
var sentTo = Set[Address]()
val router: Router = new Router(neighbors, (a, m) => sentTo += a)
router.onReceive(msg)
assertEquals(neighbors(), sentTo)
sentTo = Set[Address]()
router.onReceive(generateMessage(AddressTest.a1, AddressTest.a2, 1))
assertTrue(sentTo.isEmpty)
}
def testDiscardOldIgnores(): Unit = {
def test(first: Int, second: Int) {
var sentTo = Set[Address]()
val router: Router = new Router(neighbors, (a, m) => sentTo += a)
router.onReceive(generateMessage(AddressTest.a1, AddressTest.a3, first))
router.onReceive(generateMessage(AddressTest.a1, AddressTest.a3, second))
sentTo = Set[Address]()
router.onReceive(generateMessage(AddressTest.a1, AddressTest.a3, first))
assertEquals(neighbors(), sentTo)
}
test(1, MessageHeader.SeqNumRange.last)
test(MessageHeader.SeqNumRange.last / 2, MessageHeader.SeqNumRange.last)
test(MessageHeader.SeqNumRange.last / 2, 1)
}
private def generateMessage(sender: Address, receiver: Address, seqNum: Int): Message = {
val header = new MessageHeader(0, MessageHeader.DefaultHopLimit, sender, receiver, seqNum)
new Message(header, new CryptoData(None, None), new UserName(""))
}
}

View file

@ -3,7 +3,9 @@ package com.nutomic.ensichat.protocol
object UserTest { object UserTest {
val u1 = new User(AddressTest.a1, "one") val u1 = new User(AddressTest.a1, "one")
val u2 = new User(AddressTest.a2, "two") val u2 = new User(AddressTest.a2, "two")
val u3 = new User(AddressTest.a3, "three")
} }

View file

@ -8,15 +8,15 @@ import junit.framework.Assert._
object MessageHeaderTest { object MessageHeaderTest {
val h1 = new MessageHeader(Text.Type, MessageHeader.DefaultHopLimit, AddressTest.a1, val h1 = new MessageHeader(Text.Type, MessageHeader.DefaultHopLimit, AddressTest.a1,
AddressTest.a2, 1234, 0) AddressTest.a2, 5, 1234, 0)
val h2 = new MessageHeader(Text.Type, 0, AddressTest.a1, AddressTest.a3, 8765, 234) val h2 = new MessageHeader(Text.Type, 0, AddressTest.a1, AddressTest.a3, 30000, 8765, 234)
val h3 = new MessageHeader(Text.Type, 0xff, AddressTest.a4, AddressTest.a2, 0, 56) val h3 = new MessageHeader(Text.Type, 0xff, AddressTest.a4, AddressTest.a2, 250, 0, 56)
val h4 = new MessageHeader(0xfff, 0, Address.Null, Address.Broadcast, 0, 0xff) val h4 = new MessageHeader(0xfff, 0, Address.Null, Address.Broadcast, MessageHeader.SeqNumRange.last, 0, 0xff)
val h5 = new MessageHeader(ConnectionInfo.Type, 0xff, Address.Broadcast, Address.Null, 0xffff, 0) val h5 = new MessageHeader(ConnectionInfo.Type, 0xff, Address.Broadcast, Address.Null, 0, 0xffff, 0)
val headers = Set(h1, h2, h3, h4, h5) val headers = Set(h1, h2, h3, h4, h5)

View file

@ -62,6 +62,9 @@
<!-- Preference title --> <!-- Preference title -->
<string name="scan_interval_seconds">Scan Interval (seconds)</string> <string name="scan_interval_seconds">Scan Interval (seconds)</string>
<!-- Preference title (debug only)-->
<string name="max_connections">Maximum Number of Connections</string>
<!-- ChatService --> <!-- ChatService -->

View file

@ -12,5 +12,11 @@
android:defaultValue="15" android:defaultValue="15"
android:inputType="number" android:inputType="number"
android:numeric="integer" /> android:numeric="integer" />
<EditTextPreference
android:title="@string/max_connections"
android:key="max_connections"
android:inputType="number"
android:numeric="integer" />
</PreferenceScreen> </PreferenceScreen>

View file

@ -7,7 +7,7 @@ import android.view._
import android.widget.AdapterView.OnItemClickListener import android.widget.AdapterView.OnItemClickListener
import android.widget._ import android.widget._
import com.nutomic.ensichat.R import com.nutomic.ensichat.R
import com.nutomic.ensichat.protocol.ChatService import com.nutomic.ensichat.protocol.{User, ChatService}
import com.nutomic.ensichat.protocol.messages.RequestAddContact import com.nutomic.ensichat.protocol.messages.RequestAddContact
import com.nutomic.ensichat.util.Database.OnContactsUpdatedListener import com.nutomic.ensichat.util.Database.OnContactsUpdatedListener
import com.nutomic.ensichat.util.UsersAdapter import com.nutomic.ensichat.util.UsersAdapter
@ -69,7 +69,8 @@ class AddContactsActivity extends EnsiChatActivity with ChatService.OnConnection
runOnUiThread(new Runnable { runOnUiThread(new Runnable {
override def run(): Unit = { override def run(): Unit = {
adapter.clear() adapter.clear()
(service.getConnections -- service.database.getContacts).foreach(adapter.add) (service.connections().map(a => service.getUser(a)) -- service.database.getContacts)
.foreach(adapter.add)
} }
}) })
} }

View file

@ -180,10 +180,10 @@ class BluetoothInterface(Service: ChatService, Crypto: Crypto) extends Interface
} }
/** /**
* Sends the message to the target address specified in the message header. * Sends the message to nextHop.
*/ */
override def send(msg: Message): Unit = override def send(nextHop: Address, msg: Message): Unit =
connections.apply(addressDeviceMap.get(msg.Header.Target)).send(msg) connections.get(addressDeviceMap.get(nextHop)).foreach(_.send(msg))
/** /**
* Returns all active Bluetooth connections. * Returns all active Bluetooth connections.

View file

@ -53,13 +53,11 @@ class TransferThread(device: Device, socket: BluetoothSocket, Handler: Bluetooth
val msg = Message.read(inStream) val msg = Message.read(inStream)
onReceive(msg, device.Id) onReceive(msg, device.Id)
Log.v(Tag, "Receiving " + msg)
} }
} catch { } catch {
case e: ReadMessageException => case e @ (_: ReadMessageException | _: IOException) =>
Log.i(Tag, "Failed to read message", e)
case e: IOException =>
Log.w(Tag, "Failed to read incoming message", e) Log.w(Tag, "Failed to read incoming message", e)
close()
return return
} }
} }
@ -69,6 +67,7 @@ class TransferThread(device: Device, socket: BluetoothSocket, Handler: Bluetooth
def send(msg: Message): Unit = { def send(msg: Message): Unit = {
try { try {
outStream.write(msg.write) outStream.write(msg.write)
Log.v(Tag, "Sending " + msg)
} catch { } catch {
case e: IOException => Log.e(Tag, "Failed to write message", e) case e: IOException => Log.e(Tag, "Failed to write message", e)
} }

View file

@ -13,7 +13,9 @@ object SettingsFragment {
val KeyUserName = "user_name" val KeyUserName = "user_name"
val KeyScanInterval = "scan_interval_seconds" val KeyScanInterval = "scan_interval_seconds"
val MaxConnections = "max_connections"
} }
/** /**
@ -25,14 +27,18 @@ class SettingsFragment extends PreferenceFragment with OnPreferenceChangeListene
super.onCreate(savedInstanceState) super.onCreate(savedInstanceState)
addPreferencesFromResource(R.xml.settings) addPreferencesFromResource(R.xml.settings)
val name = findPreference(KeyUserName) val name = findPreference(KeyUserName)
val scanInterval = findPreference(KeyScanInterval) val scanInterval = findPreference(KeyScanInterval)
val maxConnections = findPreference(MaxConnections)
name.setOnPreferenceChangeListener(this) name.setOnPreferenceChangeListener(this)
scanInterval.setOnPreferenceChangeListener(this) scanInterval.setOnPreferenceChangeListener(this)
maxConnections.setOnPreferenceChangeListener(this)
val pm = PreferenceManager.getDefaultSharedPreferences(getActivity) val pm = PreferenceManager.getDefaultSharedPreferences(getActivity)
name.setSummary(pm.getString(KeyUserName, "")) name.setSummary(pm.getString(KeyUserName, ""))
scanInterval.setSummary(pm.getString(KeyScanInterval, "15")) scanInterval.setSummary(pm.getString(KeyScanInterval, "15"))
maxConnections.setDefaultValue(Int.MaxValue)
maxConnections.setSummary(pm.getString(MaxConnections, Int.MaxValue.toString))
} }
/** /**

View file

@ -26,7 +26,7 @@ object ChatService {
def destroy(): Unit def destroy(): Unit
def send(msg: Message): Unit def send(nextHop: Address, msg: Message): Unit
} }
@ -59,9 +59,13 @@ class ChatService extends Service {
private lazy val crypto = new Crypto(this) private lazy val crypto = new Crypto(this)
private lazy val bluetoothInterface = new BluetoothInterface(this, crypto) private lazy val btInterface = new BluetoothInterface(this, crypto)
private val notificationIdGenerator = Stream.from(100) private lazy val router = new Router(connections, sendVia)
private lazy val seqNumGenerator = new SeqNumGenerator(this)
private val notificationIdGenerator = Stream.from(100).iterator
/** /**
* For this (and [[messageListeners]], functions would be useful instead of instances, * For this (and [[messageListeners]], functions would be useful instead of instances,
@ -77,7 +81,7 @@ class ChatService extends Service {
* *
* This is for user names that were received during runtime, and is not persistent. * This is for user names that were received during runtime, and is not persistent.
*/ */
private var connections = Set[User]() private var knownUsers = Set[User]()
/** /**
* Generates keys and starts Bluetooth interface. * Generates keys and starts Bluetooth interface.
@ -95,13 +99,13 @@ class ChatService extends Service {
Future { Future {
crypto.generateLocalKeys() crypto.generateLocalKeys()
bluetoothInterface.create() btInterface.create()
Log.i(Tag, "Service started, address is " + Crypto.getLocalAddress(this)) Log.i(Tag, "Service started, address is " + Crypto.getLocalAddress(this))
} }
} }
override def onDestroy(): Unit = { override def onDestroy(): Unit = {
bluetoothInterface.destroy() btInterface.destroy()
} }
override def onStartCommand(intent: Intent, flags: Int, startId: Int) = Service.START_STICKY override def onStartCommand(intent: Intent, flags: Int, startId: Int) = Service.START_STICKY
@ -127,28 +131,35 @@ class ChatService extends Service {
* Sends a new message to the given target address. * Sends a new message to the given target address.
*/ */
def sendTo(target: Address, body: MessageBody): Unit = { def sendTo(target: Address, body: MessageBody): Unit = {
if (!bluetoothInterface.getConnections.contains(target)) if (!btInterface.getConnections.contains(target))
return return
val header = new MessageHeader(body.messageType, MessageHeader.DefaultHopLimit, val header = new MessageHeader(body.messageType, MessageHeader.DefaultHopLimit,
Crypto.getLocalAddress(this), target, 0, 0) Crypto.getLocalAddress(this), target, seqNumGenerator.next())
val msg = new Message(header, body) val msg = new Message(header, body)
val encrypted = crypto.encrypt(crypto.sign(msg)) val encrypted = crypto.encrypt(crypto.sign(msg))
bluetoothInterface.send(encrypted) router.onReceive(encrypted)
onNewMessage(msg) onNewMessage(msg)
} }
private def sendVia(nextHop: Address, msg: Message) =
btInterface.send(nextHop, msg)
/** /**
* Decrypts and verifies incoming messages, forwards valid ones to [[onNewMessage()]]. * Decrypts and verifies incoming messages, forwards valid ones to [[onNewMessage()]].
*/ */
def onMessageReceived(msg: Message): Unit = { def onMessageReceived(msg: Message): Unit = {
val decrypted = crypto.decrypt(msg) if (msg.Header.Target == Crypto.getLocalAddress(this)) {
if (!crypto.verify(decrypted)) { val decrypted = crypto.decrypt(msg)
Log.i(Tag, "Ignoring message with invalid signature from " + msg.Header.Origin) if (!crypto.verify(decrypted)) {
return Log.i(Tag, "Ignoring message with invalid signature from " + msg.Header.Origin)
return
}
onNewMessage(decrypted)
} else {
router.onReceive(msg)
} }
onNewMessage(decrypted)
} }
/** /**
@ -157,7 +168,7 @@ class ChatService extends Service {
private def onNewMessage(msg: Message): Unit = msg.Body match { private def onNewMessage(msg: Message): Unit = msg.Body match {
case name: UserName => case name: UserName =>
val contact = new User(msg.Header.Origin, name.Name) val contact = new User(msg.Header.Origin, name.Name)
connections += contact knownUsers += contact
if (database.getContact(msg.Header.Origin).nonEmpty) if (database.getContact(msg.Header.Origin).nonEmpty)
database.changeContactName(contact) database.changeContactName(contact)
@ -180,7 +191,7 @@ class ChatService extends Service {
.setAutoCancel(true) .setAutoCancel(true)
.build() .build()
val nm = getSystemService(Context.NOTIFICATION_SERVICE).asInstanceOf[NotificationManager] val nm = getSystemService(Context.NOTIFICATION_SERVICE).asInstanceOf[NotificationManager]
nm.notify(notificationIdGenerator.iterator.next(), notification) nm.notify(notificationIdGenerator.next(), notification)
case _ => case _ =>
MainHandler.post(new Runnable { MainHandler.post(new Runnable {
override def run(): Unit = override def run(): Unit =
@ -202,6 +213,15 @@ class ChatService extends Service {
* @return True if the connection is valid * @return True if the connection is valid
*/ */
def onConnectionOpened(msg: Message): Boolean = { def onConnectionOpened(msg: Message): Boolean = {
val maxConnections = PreferenceManager
.getDefaultSharedPreferences(this)
.getString(SettingsFragment.MaxConnections, Int.MaxValue.toString)
.toInt
if (connections().size == maxConnections) {
Log.i(Tag, "Maximum number of connections reached")
false
}
val info = msg.Body.asInstanceOf[ConnectionInfo] val info = msg.Body.asInstanceOf[ConnectionInfo]
val sender = crypto.calculateAddress(info.key) val sender = crypto.calculateAddress(info.key)
if (sender == Address.Broadcast || sender == Address.Null) { if (sender == Address.Broadcast || sender == Address.Null) {
@ -237,19 +257,10 @@ class ChatService extends Service {
.foreach(_.apply().onConnectionsChanged()) .foreach(_.apply().onConnectionsChanged())
} }
/** def connections() =
* Returns all direct neighbors. btInterface.getConnections
*/
def getConnections: Set[User] = {
bluetoothInterface.getConnections.map{ address =>
(database.getContacts ++ connections).find(_.Address == address) match {
case Some(contact) => contact
case None => new User(address, address.toString)
}
}
}
def getUser(address: Address) = def getUser(address: Address) =
getConnections.find(_.Address == address).getOrElse(new User(address, address.toString)) knownUsers.find(_.Address == address).getOrElse(new User(address, address.toString))
} }

View file

@ -0,0 +1,47 @@
package com.nutomic.ensichat.protocol
import com.nutomic.ensichat.protocol.messages.{Message, MessageHeader}
/**
* Forwards messages to all connected devices.
*/
class Router(activeConnections: () => Set[Address], send: (Address, Message) => Unit) {
private var messageSeen = Set[(Address, Int)]()
def onReceive(msg: Message): Unit = {
val info = (msg.Header.Origin, msg.Header.SeqNum)
if (messageSeen.contains(info))
return
activeConnections().foreach(a => send(a, msg))
trimMessageSeen(info._1, info._2)
messageSeen += info
}
/**
* Removes old entries from [[messageSeen]].
*
* Only the last half of possible sequence number values are kept. For example, if sequence
* numbers are between 0 and 10, and a new message with sequence number 6 arrives, all entries
* for messages with sequence numbers outside [2, 6] are removed.
*/
private def trimMessageSeen(a1: Address, s1: Int): Unit = {
messageSeen = messageSeen.filter { case (a2, s2) =>
if (a1 != a2)
true
// True if [[s2]] is between {{{MessageHeader.SeqNumRange.size / 2}}} and
// [[MessageHeader.SeqNumRange.size]].
if (s1 > MessageHeader.SeqNumRange.size / 2) {
// True if [[s2]] is between {{{s1 - MessageHeader.SeqNumRange.size / 2}}} and [[s1]].
s1 - MessageHeader.SeqNumRange.size / 2 < s2 && s2 < s1
} else {
// True if [[s2]] is *not* between [[s1]] and {{{s1 + MessageHeader.SeqNumRange.size / 2}}}.
s2 < s1 || s2 > s1 + MessageHeader.SeqNumRange.size / 2
}
}
}
}

View file

@ -0,0 +1,24 @@
package com.nutomic.ensichat.protocol
import android.content.Context
import android.preference.PreferenceManager
import com.nutomic.ensichat.protocol.messages.MessageHeader
/**
* Generates sequence numbers acorrding to protocol, which are stored persistently.
*/
class SeqNumGenerator(context: Context) {
private val KeySequenceNumber = "sequence_number"
private val pm = PreferenceManager.getDefaultSharedPreferences(context)
private var current = pm.getInt(KeySequenceNumber, MessageHeader.SeqNumRange.head)
def next(): Int = {
current += 1
pm.edit().putInt(KeySequenceNumber, current)
current
}
}

View file

@ -11,6 +11,8 @@ object MessageHeader {
val DefaultHopLimit = 20 val DefaultHopLimit = 20
val Version = 0 val Version = 0
val SeqNumRange = 0 until ((2 << 16) - 1)
class ParseMessageException(detailMessage: String) extends RuntimeException(detailMessage) { class ParseMessageException(detailMessage: String) extends RuntimeException(detailMessage) {
} }
@ -33,7 +35,9 @@ object MessageHeader {
val origin = new Address(BufferUtils.getByteArray(b, Address.Length)) val origin = new Address(BufferUtils.getByteArray(b, Address.Length))
val target = new Address(BufferUtils.getByteArray(b, Address.Length)) val target = new Address(BufferUtils.getByteArray(b, Address.Length))
new MessageHeader(messageType, hopLimit, origin, target, length, hopCount) val seqNum = BufferUtils.getUnsignedShort(b)
new MessageHeader(messageType, hopLimit, origin, target, seqNum, length, hopCount)
} }
} }
@ -45,6 +49,7 @@ case class MessageHeader(MessageType: Int,
HopLimit: Int, HopLimit: Int,
Origin: Address, Origin: Address,
Target: Address, Target: Address,
SeqNum: Int,
Length: Long = -1, Length: Long = -1,
HopCount: Int = 0) { HopCount: Int = 0) {
@ -63,7 +68,8 @@ case class MessageHeader(MessageType: Int,
b.put(Origin.Bytes) b.put(Origin.Bytes)
b.put(Target.Bytes) b.put(Target.Bytes)
BufferUtils.putUnsignedInt(b, 0) BufferUtils.putUnsignedShort(b, SeqNum)
BufferUtils.putUnsignedShort(b, 0)
b.array() b.array()
} }