Added message buffer with automatic retry.

This commit is contained in:
Felix Ableitner 2016-06-02 21:36:48 +02:00
parent a0cd4248d9
commit 23ee0f6da7
7 changed files with 197 additions and 45 deletions

View file

@ -1,18 +1,18 @@
package com.nutomic.ensichat.core
import java.security.InvalidKeyException
import java.util.Date
import java.util.{TimerTask, Timer, Date}
import com.nutomic.ensichat.core.body._
import com.nutomic.ensichat.core.header.{ContentHeader, MessageHeader}
import com.nutomic.ensichat.core.interfaces._
import com.nutomic.ensichat.core.internet.InternetInterface
import com.nutomic.ensichat.core.util.{Database, FutureHelper, LocalRoutesInfo, RouteMessageInfo}
import com.nutomic.ensichat.core.util._
import com.typesafe.scalalogging.Logger
import org.joda.time.{DateTime, Duration}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
/**
* High-level handling of all message transfers and callbacks.
@ -27,7 +27,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
private val logger = Logger(this.getClass)
private val MissingRouteMessageTimeout = 5.minutes
private val CheckMessageRetryInterval = Duration.standardMinutes(1)
private var transmissionInterfaces = Set[TransmissionInterface]()
@ -41,13 +41,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
(a, m) => transmissionInterfaces.foreach(_.send(a, m)),
noRouteFound)
/**
* Contains messages that couldn't be forwarded because we don't know a route.
*
* These will be buffered until we receive a [[RouteReply]] for the target, or when until the
* message has couldn't be forwarded after [[MissingRouteMessageTimeout]].
*/
private var missingRouteMessages = Set[(Message, Date)]()
private val messageBuffer = new MessageBuffer(requestRoute)
/**
* Holds all known users.
@ -76,6 +70,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
}
def stop(): Unit = {
messageBuffer.stop()
transmissionInterfaces.foreach(_.destroy())
database.close()
}
@ -157,6 +152,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
case rreq: RouteRequest =>
logger.trace(s"Received $msg")
localRoutesInfo.addRoute(msg.header.origin, rreq.originSeqNum, previousHop, rreq.originMetric)
resendMissingRouteMessages()
// TODO: Respecting this causes the RERR test to fail. We have to fix the implementation
// of isMessageRedundant() without breaking the test.
if (routeMessageInfo.isMessageRedundant(msg)) {
@ -239,27 +235,17 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
}
/**
* Tries to send messages in [[missingRouteMessages]] again, after we acquired a new route.
*
* Before checking [[missingRouteMessages]], those older than [[MissingRouteMessageTimeout]]
* are removed.
* Tries to send messages in [[MessageBuffer]] again, after we acquired a new route.
*/
private def resendMissingRouteMessages(): Unit = {
// resend messages if possible
val date = new Date()
missingRouteMessages = missingRouteMessages.filter { e =>
val removeTime = new Date(e._2.getTime + MissingRouteMessageTimeout.toMillis)
removeTime.after(date)
}
val m = missingRouteMessages.filter(m => localRoutesInfo.getRoute(m._1.header.target).isDefined)
m.foreach( m => router.forwardMessage(m._1))
missingRouteMessages --= m
localRoutesInfo.getAllAvailableRoutes
.flatMap( r => messageBuffer.getMessages(r.destination))
.foreach(router.forwardMessage(_))
}
private def noRouteFound(message: Message): Unit = {
if (message.header.origin == crypto.localAddress) {
missingRouteMessages += ((message, new Date()))
messageBuffer.addMessage(message)
requestRoute(message.header.target)
} else
routeError(message.header.target, Option(message.header.origin))
@ -330,6 +316,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
sendTo(sender, new UserInfo(settings.get(SettingsInterface.KeyUserName, ""),
settings.get(SettingsInterface.KeyUserStatus, "")))
callbacks.onConnectionsChanged()
resendMissingRouteMessages()
true
}

View file

@ -61,16 +61,23 @@ private[core] class LocalRoutesInfo(activeConnections: () => Set[Address]) {
routes += entry
}
def getRoute(destination: Address): Option[RouteEntry] = {
if (activeConnections().contains(destination))
return Option(new RouteEntry(destination, 0, destination, DateTime.now, DateTime.now, 1, Idle))
/**
* Returns a list of all known routes (excluding invalid), ordered by best metric.
*/
def getAllAvailableRoutes: List[RouteEntry] = {
handleTimeouts()
val r = routes.toList
val neighbors = activeConnections()
.map(c => new RouteEntry(c, 0, c, DateTime.now, DateTime.now, 1, Idle))
(neighbors ++ routes).toList
.filter(r => r.state != Invalid)
.sortWith(_.metric < _.metric)
.find( r => r.destination == destination && r.state != Invalid)
}
if (r.isDefined)
def getRoute(destination: Address): Option[RouteEntry] = {
val r = getAllAvailableRoutes
.find( r => r.destination == destination)
if (r.isDefined && routes.contains(r.get))
routes = routes -- r + r.get.copy(state = Active, lastUsed = DateTime.now)
r
}

View file

@ -0,0 +1,91 @@
package com.nutomic.ensichat.core.util
import java.util.{TimerTask, Timer}
import com.nutomic.ensichat.core.{Address, Message}
import org.joda.time.{DateTime, Duration}
/**
* Contains messages that couldn't be forwarded because we don't know a route.
*/
class MessageBuffer(retryMessageSending: (Address) => Unit) {
/**
* The maximum number of times we retry to deliver a message.
*/
private val MaxRetryCount = 6
private val timer = new Timer()
private case class BufferEntry(message: Message, added: DateTime, retryCount: Int)
private var values = Set[BufferEntry]()
private var isStopped = false
def stop(): Unit = {
isStopped = true
timer.cancel()
}
def addMessage(msg: Message): Unit = {
val newEntry = new BufferEntry(msg, DateTime.now, 0)
values += newEntry
retryMessage(newEntry)
}
/**
* Calculates the duration until the next retry, measured from the time the message was added.
*/
private def calculateNextRetryOffset(retryCount: Int) =
Duration.standardSeconds(10 ^ (retryCount + 1))
/**
* Starts a timer to retry the route discovery.
*
* The delivery will not be retried if the [[stop]] was called, the message has timed out from
* the buffer, the message was sent, or a newer message for the same destination was added.
*/
private def retryMessage(entry: BufferEntry) {
timer.schedule(new TimerTask {
override def run(): Unit = {
if (isStopped)
return
// New entry was added for the same destination, don't retry here any more.
val newerEntryExists = values
.filter(_.message.header.target == entry.message.header.target)
.map(_.added)
.exists(_.isAfter(entry.added))
if (newerEntryExists)
return
// Don't retry if message was sent in the mean time, or message timed out.
handleTimeouts()
if (!values.map(_.message).contains(entry.message))
return
retryMessageSending(entry.message.header.target)
val updated = entry.copy(retryCount = entry.retryCount + 1)
retryMessage(updated)
}
}, calculateNextRetryOffset(entry.retryCount).getMillis)
}
/**
* Returns all buffered messages for destination, and removes them from the buffer.
*/
def getMessages(destination: Address): Set[Message] = {
handleTimeouts()
val ret = values.filter(_.message.header.target == destination)
values --= ret
ret.map(_.message)
}
private def handleTimeouts(): Unit = {
values = values.filter { e =>
e.retryCount < MaxRetryCount
}
}
}

View file

@ -42,4 +42,20 @@ class LocalRoutesInfoTest extends TestCase {
assertEquals(None, routesInfo.getRoute(AddressTest.a3))
}
}
def testNeighbor(): Unit = {
val routesInfo = new LocalRoutesInfo(connections)
val r1 = routesInfo.getRoute(AddressTest.a1)
assertTrue(r1.isDefined)
assertEquals(AddressTest.a1, r1.get.destination)
assertEquals(1, r1.get.metric)
}
def testGetAllAvailableRoutes(): Unit = {
val routesInfo = new LocalRoutesInfo(connections)
routesInfo.addRoute(AddressTest.a3, 0, AddressTest.a1, 1)
val destinations = routesInfo.getAllAvailableRoutes.map(_.destination).toSet
assertEquals(connections() + AddressTest.a3, destinations)
}
}

View file

@ -0,0 +1,30 @@
package com.nutomic.ensichat.core.util
import java.util.concurrent.{TimeUnit, CountDownLatch}
import com.nutomic.ensichat.core.MessageTest
import junit.framework.TestCase
import org.junit.Assert._
class MessageBufferTest extends TestCase {
def testGetMessages(): Unit = {
val buffer = new MessageBuffer(() => _)
buffer.addMessage(MessageTest.m1)
buffer.addMessage(MessageTest.m2)
val msgs = buffer.getMessages(MessageTest.m1.header.target)
assertEquals(1, msgs.size)
assertEquals(MessageTest.m1, msgs.head)
}
def testRetryMessage(): Unit = {
val latch = new CountDownLatch(1)
val buffer = new MessageBuffer({e =>
assertEquals(MessageTest.m1.header.target, e)
latch.countDown()
})
buffer.addMessage(MessageTest.m1)
assertTrue(latch.await(15, TimeUnit.SECONDS))
}
}

View file

@ -1,9 +1,8 @@
package com.nutomic.ensichat.integration
import java.io.File
import java.util.concurrent.{LinkedBlockingDeque, LinkedBlockingQueue}
import java.util.concurrent.LinkedBlockingQueue
import com.nutomic.ensichat.core.body.{RouteError, RouteRequest, RouteReply}
import com.nutomic.ensichat.core.interfaces.{CallbackInterface, SettingsInterface}
import com.nutomic.ensichat.core.util.Database
import com.nutomic.ensichat.core.{ConnectionHandler, Crypto, Message}
@ -43,8 +42,6 @@ object LocalNode {
* @param configFolder Folder where keys and configuration should be stored.
*/
class LocalNode(val index: Int, configFolder: File) extends CallbackInterface {
import com.nutomic.ensichat.integration.LocalNode.EventType._
private val databaseFile = new File(configFolder, "database")
private val keyFolder = new File(configFolder, "keys")

View file

@ -1,13 +1,14 @@
package com.nutomic.ensichat.integration
import java.io.File
import java.util.{TimerTask, Timer}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.nutomic.ensichat.core.Crypto
import com.nutomic.ensichat.core.body.Text
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.duration.{DurationLong, Duration}
import scala.concurrent.{Await, Future}
import scala.util.Try
import scalax.file.Path
@ -23,12 +24,36 @@ object Main extends App {
System.out.println("\n\nAll nodes connected!\n\n")
sendMessages(nodes)
System.out.println("\n\nAll messages sent!\n\n")
System.out.println("\n\nMessages sent!\n\n")
// Stop node 1, forcing route errors and messages to use the (longer) path via nodes 7 and 8.
nodes(1).connectionHandler.stop()
System.out.println("node 1 stopped")
System.out.println("Node 1 stopped")
sendMessages(nodes)
System.out.println("\n\nMessages after route change sent!\n\n")
// Create new node 9, send message from node 0 to its address, before actually connecting it.
// The message is automatically delivered when node 9 connects as neighbor.
val node9 = Await.result(createNode(9), Duration.Inf)
val timer = new Timer()
timer.schedule(new TimerTask {
override def run(): Unit = {
connectNodes(nodes(0), node9)
}
}, Duration(10, TimeUnit.SECONDS).toMillis)
sendMessage(nodes(0), node9, 30)
// Create new node 10, send message from node 7 to its address, before connecting it to the mesh.
// The message is delivered after node 7 starts a route discovery triggered by the message buffer.
val node10 = Await.result(createNode(10), Duration.Inf)
timer.schedule(new TimerTask {
override def run(): Unit = {
connectNodes(nodes(0), node10)
timer.cancel()
}
}, Duration(5, TimeUnit.SECONDS).toMillis)
sendMessage(nodes(7), node10, 30)
System.out.println("\n\nMessages after delay sent!\n\n")
/**
* Creates a new mesh with a predefined layout.
@ -40,7 +65,7 @@ object Main extends App {
* \ / | |
* 2 56
*
* @return List of [[LocalNode]]s, ordered from 0 to 7.
* @return List of [[LocalNode]]s, ordered from 0 to 8.
*/
private def createMesh(): Seq[LocalNode] = {
val nodes = Await.result(Future.sequence(0.to(8).map(createNode)), Duration.Inf)
@ -100,7 +125,7 @@ object Main extends App {
}
private def sendMessage(from: LocalNode, to: LocalNode): Unit = {
private def sendMessage(from: LocalNode, to: LocalNode, waitSeconds: Int = 1): Unit = {
addKey(to.crypto, from.crypto)
addKey(from.crypto, to.crypto)
@ -124,13 +149,12 @@ object Main extends App {
assert(exists, s"message from ${from.index} did not arrive at ${to.index}")
latch.countDown()
}
assert(latch.await(1000, TimeUnit.MILLISECONDS))
assert(latch.await(waitSeconds, TimeUnit.SECONDS))
}
private def addKey(addTo: Crypto, addFrom: Crypto): Unit = {
if (Try(addTo.getPublicKey(addFrom.localAddress)).isFailure)
addTo.addPublicKey(addFrom.localAddress, addFrom.getLocalPublicKey)
}
}