Merge branch 'message-confirmations'

This commit is contained in:
Felix Ableitner 2016-07-17 17:05:31 +02:00
commit 51b1feee4e
16 changed files with 207 additions and 75 deletions

View File

@ -85,7 +85,7 @@ version, type and ID, followed by the length of the message.
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Version | Protocol-Type | Tokens | Hop Limit |
| Version | Protocol-Type | Tokens | Hop Count |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
@ -327,3 +327,14 @@ Text the string to be transferred, encoded as UTF-8.
Contains the sender's name and status, which should be used for
display to users.
### MessageReceived (Content-Type = 8)
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Message ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Confirms that a previous content message has been received by the
target node. Message ID is the ID of that message.

View File

@ -32,11 +32,12 @@ class ChatService extends Service {
private val callbackHandler = new CallbackHandler(this, notificationHandler)
lazy val database = new Database(getDatabasePath("database"), callbackHandler)
private def settingsWrapper = new SettingsWrapper(this)
lazy val database = new Database(getDatabasePath("database"), settingsWrapper, callbackHandler)
private lazy val connectionHandler =
new ConnectionHandler(new SettingsWrapper(this), database, callbackHandler,
ChatService.newCrypto(this), 1)
new ConnectionHandler(settingsWrapper, database, callbackHandler, ChatService.newCrypto(this), 1)
private val networkReceiver = new NetworkChangedReceiver()

View File

@ -1,7 +1,6 @@
package com.nutomic.ensichat.core
import java.security.InvalidKeyException
import java.util.Date
import com.nutomic.ensichat.core.body._
import com.nutomic.ensichat.core.header.{AbstractHeader, ContentHeader, MessageHeader}
@ -9,7 +8,7 @@ import com.nutomic.ensichat.core.interfaces._
import com.nutomic.ensichat.core.internet.InternetInterface
import com.nutomic.ensichat.core.util._
import com.typesafe.scalalogging.Logger
import org.joda.time.Duration
import org.joda.time.{DateTime, Duration}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
@ -39,7 +38,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
(a, m) => transmissionInterfaces.foreach(_.send(a, m)),
noRouteFound)
private val messageBuffer = new MessageBuffer(requestRoute)
private lazy val messageBuffer = new MessageBuffer(crypto.localAddress, requestRoute)
/**
* Holds all known users.
@ -64,6 +63,11 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
transmissionInterfaces +=
new InternetInterface(this, crypto, settings, maxInternetConnections, port)
transmissionInterfaces.foreach(_.create())
database.getUnconfirmedMessages.foreach { m =>
val encrypted = crypto.encryptAndSign(m)
messageBuffer.addMessage(encrypted)
requestRoute(encrypted.header.target)
}
}
}
@ -81,7 +85,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
FutureHelper {
val messageId = settings.get("message_id", 0L)
val header = new ContentHeader(crypto.localAddress, target, seqNumGenerator.next(),
body.contentType, Some(messageId), Some(new Date()), AbstractHeader.InitialForwardingTokens)
body.contentType, Some(messageId), Some(DateTime.now), AbstractHeader.InitialForwardingTokens)
settings.put("message_id", messageId + 1)
val msg = new Message(header, body)
@ -100,7 +104,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
val header = new MessageHeader(body.protocolType, crypto.localAddress, Address.Broadcast, seqNum, 0)
val signed = crypto.sign(new Message(header, body))
logger.trace(s"sending new $signed")
router.forwardMessage(signed)
}
@ -110,7 +113,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
val header = new MessageHeader(body.protocolType, crypto.localAddress, replyTo, seqNum, 0)
val signed = crypto.sign(new Message(header, body))
logger.trace(s"sending new $signed")
router.forwardMessage(signed)
}
@ -122,7 +124,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
val body = new RouteError(address, seqNum)
val signed = crypto.sign(new Message(header, body))
logger.trace(s"sending new $signed")
router.forwardMessage(signed)
}
@ -149,7 +150,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
msg.body match {
case rreq: RouteRequest =>
logger.trace(s"Received $msg")
localRoutesInfo.addRoute(msg.header.origin, rreq.originSeqNum, previousHop, rreq.originMetric)
resendMissingRouteMessages()
// TODO: Respecting this causes the RERR test to fail. We have to fix the implementation
@ -172,7 +172,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
}
return
case rrep: RouteReply =>
logger.trace(s"Received $msg")
localRoutesInfo.addRoute(msg.header.origin, rrep.originSeqNum, previousHop, 0)
// TODO: See above (in RREQ handler).
if (routeMessageInfo.isMessageRedundant(msg)) {
@ -198,7 +197,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
router.forwardMessage(forwardMsg)
return
case rerr: RouteError =>
logger.trace(s"Received $msg")
localRoutesInfo.getRoute(rerr.address).foreach { route =>
if (route.nextHop == msg.header.origin && (rerr.seqNum == 0 || rerr.seqNum > route.seqNum)) {
localRoutesInfo.connectionClosed(rerr.address)
@ -231,6 +229,19 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
return
}
// This is necessary because a message is sent to the destination and relays seperately,
// with different sequence numbers. Because of this, we also have to check the message ID
// to avoid duplicate messages.
if (database.getMessages(msg.header.origin).exists(m => m.header.origin == plainMsg.header.origin && m.header.messageId == plainMsg.header.messageId)) {
logger.trace(s"Received message $msg again, ignoring")
return
}
if (plainMsg.body.contentType == Text.Type) {
logger.trace(s"Sending confirmation for $plainMsg")
sendTo(plainMsg.header.origin, new MessageReceived(plainMsg.header.messageId.get))
}
onNewMessage(plainMsg)
}
@ -273,6 +284,8 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
database.updateContact(contact)
callbacks.onConnectionsChanged()
case mr: MessageReceived =>
database.setMessageConfirmed(mr.messageId)
case _ =>
val origin = msg.header.origin
if (origin != crypto.localAddress && database.getContact(origin).isEmpty)

View File

@ -262,6 +262,7 @@ class Crypto(settings: SettingsInterface, keyFolder: File) {
val body = msg.header.asInstanceOf[ContentHeader].contentType match {
case Text.Type => Text.read(decrypted)
case UserInfo.Type => UserInfo.read(decrypted)
case MessageReceived.Type => MessageReceived.read(decrypted)
}
new Message(msg.header, msg.crypto, body)
}

View File

@ -0,0 +1,49 @@
package com.nutomic.ensichat.core.body
import java.nio.ByteBuffer
import com.nutomic.ensichat.core.Message
import com.nutomic.ensichat.core.util.BufferUtils
import scala.Predef.String
object MessageReceived {
val Type = 8
/**
* Constructs [[Text]] instance from byte array.
*/
def read(array: Array[Byte]): MessageReceived = {
val b = ByteBuffer.wrap(array)
val messageId = BufferUtils.getUnsignedInt(b)
new MessageReceived(messageId)
}
}
/**
* Holds a plain text message.
*/
final case class MessageReceived(messageId: Long) extends MessageBody {
override def protocolType = -1
override def contentType = MessageReceived.Type
override def write: Array[Byte] = {
val b = ByteBuffer.allocate(length)
// TODO: This should be putUnsignedLong, but doesn't seem possible in the JVM.
// Alternatively, we could use signed ints instead.
BufferUtils.putUnsignedInt(b, messageId)
b.array()
}
override def length = 4
override def equals(a: Any): Boolean = a match {
case o: MessageReceived => messageId == o.messageId
case _ => false
}
}

View File

@ -1,10 +1,10 @@
package com.nutomic.ensichat.core.header
import java.nio.ByteBuffer
import java.util.Date
import com.nutomic.ensichat.core.Address
import com.nutomic.ensichat.core.util.BufferUtils
import org.joda.time.DateTime
object AbstractHeader {
@ -33,7 +33,7 @@ trait AbstractHeader {
def target: Address
def seqNum: Int
def messageId: Option[Long] = None
def time: Option[Date] = None
def time: Option[DateTime] = None
/**
* Writes the header to byte array.

View File

@ -1,10 +1,10 @@
package com.nutomic.ensichat.core.header
import java.nio.ByteBuffer
import java.util.Date
import com.nutomic.ensichat.core.Address
import com.nutomic.ensichat.core.util.BufferUtils
import org.joda.time.DateTime
object ContentHeader {
@ -25,7 +25,7 @@ object ContentHeader {
val time = BufferUtils.getUnsignedInt(b)
val ch = new ContentHeader(mh.origin, mh.target, mh.seqNum, contentType, Some(messageId),
Some(new Date(time * 1000)), mh.tokens, mh.hopCount)
Some(new DateTime(time * 1000)), mh.tokens, mh.hopCount)
val remaining = new Array[Byte](b.remaining())
b.get(remaining, 0, b.remaining())
@ -44,7 +44,7 @@ final case class ContentHeader(override val origin: Address,
override val seqNum: Int,
contentType: Int,
override val messageId: Some[Long],
override val time: Some[Date],
override val time: Some[DateTime],
override val tokens: Int,
override val hopCount: Int = 0)
extends AbstractHeader {
@ -61,7 +61,7 @@ final case class ContentHeader(override val origin: Address,
BufferUtils.putUnsignedShort(b, contentType)
BufferUtils.putUnsignedInt(b, messageId.get)
BufferUtils.putUnsignedInt(b, time.get.getTime / 1000)
BufferUtils.putUnsignedInt(b, time.get.getMillis / 1000)
b.array()
}
@ -73,7 +73,7 @@ final case class ContentHeader(override val origin: Address,
super.equals(a) &&
contentType == o.contentType &&
messageId == o.messageId &&
time.get.getTime / 1000 == o.time.get.getTime / 1000
time.get.getMillis / 1000 == o.time.get.getMillis / 1000
case _ => false
}

View File

@ -2,14 +2,14 @@ package com.nutomic.ensichat.core.util
import java.io.File
import java.sql.DriverManager
import java.util.Date
import com.nutomic.ensichat.core.body.Text
import com.nutomic.ensichat.core.header.ContentHeader
import com.nutomic.ensichat.core.interfaces.{CallbackInterface, SettingsInterface}
import com.nutomic.ensichat.core.{Address, Message, User}
import com.nutomic.ensichat.core.{Crypto, Address, Message, User}
import com.typesafe.scalalogging.Logger
import org.joda.time
import org.joda.time.DateTime
import slick.driver.H2Driver.api._
import scala.concurrent.Await
@ -30,29 +30,30 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
private val DatabasePath = "jdbc:h2:" + path.getAbsolutePath + ";DATABASE_TO_UPPER=false"
private class Messages(tag: Tag) extends Table[Message](tag, "MESSAGES") {
def id = primaryKey("id", (origin, messageId))
def origin = column[String]("origin")
def target = column[String]("target")
def messageId = column[Long]("message_id")
def text = column[String]("text")
def date = column[Long]("date")
def tokens = column[Int]("tokens")
def * = (origin, target, messageId, text, date, tokens) <> [Message, (String, String, Long, String, Long, Int)]( {
private class Messages(tag: Tag) extends Table[(Message, Boolean)](tag, "MESSAGES") {
def id = primaryKey("id", (origin, messageId))
def origin = column[String]("origin")
def target = column[String]("target")
def messageId = column[Long]("message_id")
def text = column[String]("text")
def date = column[Long]("date")
def tokens = column[Int]("tokens")
def confirmedDelivered = column[Boolean]("confirmed_delivered")
def * = (origin, target, messageId, text, date, tokens, confirmedDelivered) <> [(Message, Boolean), (String, String, Long, String, Long, Int, Boolean)]( {
tuple =>
val header = new ContentHeader(new Address(tuple._1),
new Address(tuple._2),
-1,
Text.Type,
Some(tuple._3),
Some(new Date(tuple._5)),
Some(new DateTime(tuple._5)),
tuple._6)
val body = new Text(tuple._4)
new Message(header, body)
(new Message(header, body), tuple._7)
}, message =>
Option((message.header.origin.toString(), message.header.target.toString(),
message.header.messageId.get, message.body.asInstanceOf[Text].text,
message.header.time.get.getTime, message.header.tokens))
Option((message._1.header.origin.toString(), message._1.header.target.toString(),
message._1.header.messageId.get, message._1.body.asInstanceOf[Text].text,
message._1.header.time.get.getMillis, message._1.header.tokens, message._2))
)
}
private val messages = TableQuery[Messages]
@ -109,15 +110,13 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
connection.createStatement().executeUpdate("ALTER TABLE MESSAGES ADD COLUMN (tokens INT);")
connection.commit()
Await.result(db.run(knownDevices.schema.create), Duration.Inf)
connection.createStatement().executeUpdate("ALTER TABLE MESSAGES ADD COLUMN (confirmed_delivered INT);")
connection.commit()
}
connection.close()
settings.put(DatabaseVersionKey, DatabaseVersion)
}
// Apparently, slick doesn't support ALTER TABLE, so we have to write raw SQL for this...
{
}
def close(): Unit = {
Await.result(db.shutdown, Duration.Inf)
}
@ -126,7 +125,8 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
* Inserts the given new message into the database.
*/
def onMessageReceived(msg: Message): Unit = msg.body match {
case _: Text => Await.result(db.run(messages += msg), Duration.Inf)
case _: Text =>
Await.result(db.run(messages += (msg, false)), Duration.Inf)
case _ =>
}
@ -134,7 +134,7 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
val query = messages.filter { m =>
m.origin === address.toString || m.target === address.toString
}
Await.result(db.run(query.result), Duration.Inf)
Await.result(db.run(query.result), Duration.Inf).map(_._1)
}
/**
@ -201,4 +201,24 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
Await.result(db.run(query), Duration.Inf)
}
def setMessageConfirmed(messageId: Long): Unit = {
val localAddress = new Address(settings.get(Crypto.LocalAddressKey, ""))
val query = messages.filter { c =>
c.origin === localAddress.toString &&
c.messageId === messageId
}
.map(_.confirmedDelivered)
.update(true)
Await.result(db.run(query), Duration.Inf)
}
/**
* Returns all addresses sent by us that have confirmedDlivered = false.
*/
def getUnconfirmedMessages: Set[Message] = {
val localAddress = new Address(settings.get(Crypto.LocalAddressKey, ""))
val query = messages.filter(m => m.origin === localAddress.toString && !m.confirmedDelivered)
Await.result(db.run(query.result), Duration.Inf).map(_._1).toSet
}
}

View File

@ -11,7 +11,7 @@ private[core] object LocalRoutesInfo {
/**
* [[RouteStates.Idle]]:
* A route that is known, but has not been used in the last [[ActiveInterval.
* A route that is known, but has not been used in the last [[ActiveInterval]].
* [[RouteStates.Active]]:
* A route that is known, and has been used in the last [[ActiveInterval]].
* [[RouteStates.Invalid]]:

View File

@ -3,12 +3,12 @@ package com.nutomic.ensichat.core.util
import java.util.{TimerTask, Timer}
import com.nutomic.ensichat.core.{Address, Message}
import org.joda.time.{DateTime, Duration}
import org.joda.time.{Seconds, DateTime, Duration}
/**
* Contains messages that couldn't be forwarded because we don't know a route.
*/
class MessageBuffer(retryMessageSending: (Address) => Unit) {
class MessageBuffer(localAddress: Address, retryMessageSending: (Address) => Unit) {
/**
* The maximum number of times we retry to deliver a message.
@ -29,7 +29,13 @@ class MessageBuffer(retryMessageSending: (Address) => Unit) {
}
def addMessage(msg: Message): Unit = {
val newEntry = new BufferEntry(msg, DateTime.now, 0)
// For old messages added back from database, find their retry count from send time and offset.
val retryCount =
(0 to 6).find { i =>
msg.header.time.get.plus(calculateNextRetryOffset(i)).isAfter(DateTime.now)
}
.getOrElse(6)
val newEntry = new BufferEntry(msg, DateTime.now, retryCount)
values += newEntry
retryMessage(newEntry)
}
@ -86,7 +92,7 @@ class MessageBuffer(retryMessageSending: (Address) => Unit) {
private def handleTimeouts(): Unit = {
values = values.filter { e =>
e.retryCount < MaxRetryCount
e.retryCount < MaxRetryCount && e.message.header.origin != localAddress
}
}

View File

@ -1,12 +1,13 @@
package com.nutomic.ensichat.core
import java.util.GregorianCalendar
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Date, GregorianCalendar}
import com.nutomic.ensichat.core.body.{Text, UserInfo}
import com.nutomic.ensichat.core.header.ContentHeader
import com.nutomic.ensichat.core.util.LocalRoutesInfo
import junit.framework.TestCase
import org.joda.time.DateTime
import org.junit.Assert._
class RouterTest extends TestCase {
@ -93,14 +94,14 @@ class RouterTest extends TestCase {
def testHopLimit(): Unit = Range(19, 22).foreach { i =>
val msg = new Message(
new ContentHeader(AddressTest.a1, AddressTest.a2, 1, 1, Some(1), Some(new Date()), 3, i), new Text(""))
new ContentHeader(AddressTest.a1, AddressTest.a2, 1, 1, Some(1), Some(DateTime.now), 3, i), new Text(""))
val router = new Router(new LocalRoutesInfo(neighbors), (a, m) => fail(), _ => ())
router.forwardMessage(msg)
}
private def generateMessage(sender: Address, receiver: Address, seqNum: Int): Message = {
val header = new ContentHeader(sender, receiver, seqNum, UserInfo.Type, Some(5),
Some(new GregorianCalendar(2014, 6, 10).getTime), 3)
Some(new DateTime(new GregorianCalendar(2014, 6, 10).getTime)), 3)
new Message(header, new UserInfo("", ""))
}

View File

@ -5,24 +5,25 @@ import java.util.{Date, GregorianCalendar}
import com.nutomic.ensichat.core.body.Text
import com.nutomic.ensichat.core.{Address, AddressTest}
import junit.framework.TestCase
import org.joda.time.DateTime
import org.junit.Assert._
object ContentHeaderTest {
val h1 = new ContentHeader(AddressTest.a1, AddressTest.a2, 1234,
Text.Type, Some(123), Some(new GregorianCalendar(1970, 1, 1).getTime), 3)
Text.Type, Some(123), Some(new DateTime(new GregorianCalendar(1970, 1, 1).getTime)), 3)
val h2 = new ContentHeader(AddressTest.a1, AddressTest.a3,
30000, Text.Type, Some(8765), Some(new GregorianCalendar(2014, 6, 10).getTime), 2)
30000, Text.Type, Some(8765), Some(new DateTime(new GregorianCalendar(2014, 6, 10))), 2)
val h3 = new ContentHeader(AddressTest.a4, AddressTest.a2,
250, Text.Type, Some(77), Some(new GregorianCalendar(2020, 11, 11).getTime), 1)
250, Text.Type, Some(77), Some(new DateTime(new GregorianCalendar(2020, 11, 11).getTime)), 1)
val h4 = new ContentHeader(Address.Null, Address.Broadcast,
ContentHeader.SeqNumRange.last, 0, Some(0xffff), Some(new Date(0L)), 6)
ContentHeader.SeqNumRange.last, 0, Some(0xffff), Some(new DateTime(0L)), 6)
val h5 = new ContentHeader(Address.Broadcast, Address.Null,
0, 0xff, Some(0), Some(new Date(0xffffffffL)), 0)
0, 0xff, Some(0), Some(new DateTime(0xffffffffL)), 0)
val headers = Set(h1, h2, h3, h4, h5)

View File

@ -6,11 +6,12 @@ import java.util.concurrent.CountDownLatch
import com.nutomic.ensichat.core.body.Text
import com.nutomic.ensichat.core.header.ContentHeader
import com.nutomic.ensichat.core.interfaces.{SettingsInterface, CallbackInterface}
import com.nutomic.ensichat.core.interfaces.{CallbackInterface, SettingsInterface}
import com.nutomic.ensichat.core.util.DatabaseTest._
import com.nutomic.ensichat.core.{Address, Message, User}
import junit.framework.Assert._
import junit.framework.TestCase
import org.joda.time.DateTime
object DatabaseTest {
@ -20,11 +21,11 @@ object DatabaseTest {
private val a4 = new Address("4444459893F8810C4024CFC951374AABA1F4DE6347A3D7D8E44918AD1FF2BA36")
private val h1 = new ContentHeader(a2, a1, -1, Text.Type, Some(123),
Some(new GregorianCalendar(1970, 1, 1).getTime), 0)
Some(new DateTime(new GregorianCalendar(1970, 1, 1).getTime)), 0)
private val h2 = new ContentHeader(a1, a3, -1, Text.Type, Some(8765),
Some(new GregorianCalendar(2014, 6, 10).getTime), 0)
Some(new DateTime(new GregorianCalendar(2014, 6, 10).getTime)), 0)
private val h3 = new ContentHeader(a4, a2, -1, Text.Type, Some(77),
Some(new GregorianCalendar(2020, 11, 11).getTime), 0)
Some(new DateTime(new GregorianCalendar(2020, 11, 11).getTime)), 0)
private val m1 = new Message(h1, new Text("first"))
private val m2 = new Message(h2, new Text("second"))

View File

@ -1,29 +1,41 @@
package com.nutomic.ensichat.core.util
import java.util.concurrent.{TimeUnit, CountDownLatch}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.nutomic.ensichat.core.MessageTest
import com.nutomic.ensichat.core.header.ContentHeader
import com.nutomic.ensichat.core.{Message, Address, MessageTest}
import junit.framework.TestCase
import org.joda.time.DateTime
import org.junit.Assert._
class MessageBufferTest extends TestCase {
/**
* MessageBuffer checks the time of a message, we have to use the current time or items might
* time out.
*/
private def adjustMessageTime(m: Message) =
new Message(m.header.asInstanceOf[ContentHeader].copy(time=Some(DateTime.now)), m.body)
val m1 = adjustMessageTime(MessageTest.m1)
val m2 = adjustMessageTime(MessageTest.m2)
def testGetMessages(): Unit = {
val buffer = new MessageBuffer(() => _)
buffer.addMessage(MessageTest.m1)
buffer.addMessage(MessageTest.m2)
val msgs = buffer.getMessages(MessageTest.m1.header.target)
val buffer = new MessageBuffer(Address.Null, () => _)
buffer.addMessage(m1)
buffer.addMessage(m2)
val msgs = buffer.getMessages(m1.header.target)
assertEquals(1, msgs.size)
assertEquals(MessageTest.m1, msgs.head)
assertEquals(m1, msgs.head)
}
def testRetryMessage(): Unit = {
val latch = new CountDownLatch(1)
val buffer = new MessageBuffer({e =>
assertEquals(MessageTest.m1.header.target, e)
val buffer = new MessageBuffer(Address.Null, {e =>
assertEquals(m1.header.target, e)
latch.countDown()
})
buffer.addMessage(MessageTest.m1)
buffer.addMessage(m1)
assertTrue(latch.await(15, TimeUnit.SECONDS))
}

View File

@ -50,9 +50,9 @@ class LocalNode(val index: Int, configFolder: File) extends CallbackInterface {
override def get[T](key: String, default: T): T = values.get(key).map(_.asInstanceOf[T]).getOrElse(default)
override def put[T](key: String, value: T): Unit = values += (key -> value.asInstanceOf[Any])
}
private val database = new Database(databaseFile, settings, this)
val crypto = new Crypto(settings, keyFolder)
val database = new Database(databaseFile, settings, this)
val connectionHandler = new ConnectionHandler(settings, database, this, crypto, 0, port)
val eventQueue = new FifoStream[(EventType.EventType, Option[Message])]()

View File

@ -1,14 +1,14 @@
package com.nutomic.ensichat.integration
import java.io.File
import java.util.{TimerTask, Timer}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Timer, TimerTask}
import com.nutomic.ensichat.core.Crypto
import com.nutomic.ensichat.core.body.Text
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{DurationLong, Duration}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.Try
import scalax.file.Path
@ -17,11 +17,12 @@ import scalax.file.Path
* Creates some local nodes, connects them and sends messages between them.
*
* If the test runs slow or fails, changing [[Crypto.PublicKeySize]] to 512 should help.
*
* These tests are somewhat fragile, and might fail randomly. It helps to run only one of
* the test functions at a time.
*/
object Main extends App {
// NOTE: These tests are somewhat fragile, and might fail randomly. It helps to run only
// one of the following functions at a time.
testNeighborSending()
testMeshMessageSending()
testIndirectRelay()
@ -29,6 +30,7 @@ object Main extends App {
testMessageDeliveryOnConnect()
testSendDelayed()
testRouteChange()
testMessageConfirmation()
private def testNeighborSending(): Unit = {
val node1 = Await.result(createNode(1), Duration.Inf)
@ -154,6 +156,20 @@ object Main extends App {
System.out.println("Test send delayed successful!")
}
/**
* Check that message confirmation is sent back after message was received.
*/
private def testMessageConfirmation(): Unit = {
val nodes = createNodes(2)
connectNodes(nodes(0), nodes(1))
sendMessage(nodes(0), nodes(1))
assert(nodes(0).database.getMessages(nodes(1).crypto.localAddress).nonEmpty)
assert(nodes(0).database.getUnconfirmedMessages.isEmpty)
nodes.foreach(_.stop())
}
private def createNodes(count: Int): Seq[LocalNode] = {
val nodes = Await.result(Future.sequence((0 until count).map(createNode)), Duration.Inf)
nodes.foreach(n => System.out.println(s"Node ${n.index} has address ${n.crypto.localAddress}"))