From 579d1f57172ace8656ddcaca009abb61237bead1 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 12 Jul 2016 22:59:54 +0200 Subject: [PATCH] Added message confirmations to protocol (ref #22). --- PROTOCOL.md | 13 +++- .../ensichat/service/ChatService.scala | 7 +- .../ensichat/core/ConnectionHandler.scala | 33 +++++++--- .../com/nutomic/ensichat/core/Crypto.scala | 1 + .../ensichat/core/body/MessageReceived.scala | 49 ++++++++++++++ .../ensichat/core/header/AbstractHeader.scala | 4 +- .../ensichat/core/header/ContentHeader.scala | 10 +-- .../nutomic/ensichat/core/util/Database.scala | 64 ++++++++++++------- .../ensichat/core/util/LocalRoutesInfo.scala | 2 +- .../ensichat/core/util/MessageBuffer.scala | 14 ++-- .../nutomic/ensichat/core/RouterTest.scala | 7 +- .../core/header/ContentHeaderTest.scala | 11 ++-- .../ensichat/core/util/DatabaseTest.scala | 9 +-- .../core/util/MessageBufferTest.scala | 32 +++++++--- .../LocalNode.scala | 2 +- .../Main.scala | 24 +++++-- 16 files changed, 207 insertions(+), 75 deletions(-) create mode 100644 core/src/main/scala/com/nutomic/ensichat/core/body/MessageReceived.scala diff --git a/PROTOCOL.md b/PROTOCOL.md index 1923f43..5be8245 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -85,7 +85,7 @@ version, type and ID, followed by the length of the message. 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Version | Protocol-Type | Tokens | Hop Limit | + | Version | Protocol-Type | Tokens | Hop Count | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -327,3 +327,14 @@ Text the string to be transferred, encoded as UTF-8. Contains the sender's name and status, which should be used for display to users. + +### MessageReceived (Content-Type = 8) + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Message ID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +Confirms that a previous content message has been received by the +target node. Message ID is the ID of that message. diff --git a/android/src/main/scala/com/nutomic/ensichat/service/ChatService.scala b/android/src/main/scala/com/nutomic/ensichat/service/ChatService.scala index 58ea3dd..d71ade8 100644 --- a/android/src/main/scala/com/nutomic/ensichat/service/ChatService.scala +++ b/android/src/main/scala/com/nutomic/ensichat/service/ChatService.scala @@ -32,11 +32,12 @@ class ChatService extends Service { private val callbackHandler = new CallbackHandler(this, notificationHandler) - lazy val database = new Database(getDatabasePath("database"), callbackHandler) + private def settingsWrapper = new SettingsWrapper(this) + + lazy val database = new Database(getDatabasePath("database"), settingsWrapper, callbackHandler) private lazy val connectionHandler = - new ConnectionHandler(new SettingsWrapper(this), database, callbackHandler, - ChatService.newCrypto(this), 1) + new ConnectionHandler(settingsWrapper, database, callbackHandler, ChatService.newCrypto(this), 1) private val networkReceiver = new NetworkChangedReceiver() diff --git a/core/src/main/scala/com/nutomic/ensichat/core/ConnectionHandler.scala b/core/src/main/scala/com/nutomic/ensichat/core/ConnectionHandler.scala index 3ff4bde..daeba3d 100644 --- a/core/src/main/scala/com/nutomic/ensichat/core/ConnectionHandler.scala +++ b/core/src/main/scala/com/nutomic/ensichat/core/ConnectionHandler.scala @@ -1,7 +1,6 @@ package com.nutomic.ensichat.core import java.security.InvalidKeyException -import java.util.Date import com.nutomic.ensichat.core.body._ import com.nutomic.ensichat.core.header.{AbstractHeader, ContentHeader, MessageHeader} @@ -9,7 +8,7 @@ import com.nutomic.ensichat.core.interfaces._ import com.nutomic.ensichat.core.internet.InternetInterface import com.nutomic.ensichat.core.util._ import com.typesafe.scalalogging.Logger -import org.joda.time.Duration +import org.joda.time.{DateTime, Duration} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -39,7 +38,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, (a, m) => transmissionInterfaces.foreach(_.send(a, m)), noRouteFound) - private val messageBuffer = new MessageBuffer(requestRoute) + private lazy val messageBuffer = new MessageBuffer(crypto.localAddress, requestRoute) /** * Holds all known users. @@ -64,6 +63,11 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, transmissionInterfaces += new InternetInterface(this, crypto, settings, maxInternetConnections, port) transmissionInterfaces.foreach(_.create()) + database.getUnconfirmedMessages.foreach { m => + val encrypted = crypto.encryptAndSign(m) + messageBuffer.addMessage(encrypted) + requestRoute(encrypted.header.target) + } } } @@ -81,7 +85,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, FutureHelper { val messageId = settings.get("message_id", 0L) val header = new ContentHeader(crypto.localAddress, target, seqNumGenerator.next(), - body.contentType, Some(messageId), Some(new Date()), AbstractHeader.InitialForwardingTokens) + body.contentType, Some(messageId), Some(DateTime.now), AbstractHeader.InitialForwardingTokens) settings.put("message_id", messageId + 1) val msg = new Message(header, body) @@ -100,7 +104,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, val header = new MessageHeader(body.protocolType, crypto.localAddress, Address.Broadcast, seqNum, 0) val signed = crypto.sign(new Message(header, body)) - logger.trace(s"sending new $signed") router.forwardMessage(signed) } @@ -110,7 +113,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, val header = new MessageHeader(body.protocolType, crypto.localAddress, replyTo, seqNum, 0) val signed = crypto.sign(new Message(header, body)) - logger.trace(s"sending new $signed") router.forwardMessage(signed) } @@ -122,7 +124,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, val body = new RouteError(address, seqNum) val signed = crypto.sign(new Message(header, body)) - logger.trace(s"sending new $signed") router.forwardMessage(signed) } @@ -149,7 +150,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, msg.body match { case rreq: RouteRequest => - logger.trace(s"Received $msg") localRoutesInfo.addRoute(msg.header.origin, rreq.originSeqNum, previousHop, rreq.originMetric) resendMissingRouteMessages() // TODO: Respecting this causes the RERR test to fail. We have to fix the implementation @@ -172,7 +172,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, } return case rrep: RouteReply => - logger.trace(s"Received $msg") localRoutesInfo.addRoute(msg.header.origin, rrep.originSeqNum, previousHop, 0) // TODO: See above (in RREQ handler). if (routeMessageInfo.isMessageRedundant(msg)) { @@ -198,7 +197,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, router.forwardMessage(forwardMsg) return case rerr: RouteError => - logger.trace(s"Received $msg") localRoutesInfo.getRoute(rerr.address).foreach { route => if (route.nextHop == msg.header.origin && (rerr.seqNum == 0 || rerr.seqNum > route.seqNum)) { localRoutesInfo.connectionClosed(rerr.address) @@ -231,6 +229,19 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, return } + // This is necessary because a message is sent to the destination and relays seperately, + // with different sequence numbers. Because of this, we also have to check the message ID + // to avoid duplicate messages. + if (database.getMessages(msg.header.origin).exists(m => m.header.origin == plainMsg.header.origin && m.header.messageId == plainMsg.header.messageId)) { + logger.trace(s"Received message $msg again, ignoring") + return + } + + if (plainMsg.body.contentType == Text.Type) { + logger.trace(s"Sending confirmation for $plainMsg") + sendTo(plainMsg.header.origin, new MessageReceived(plainMsg.header.messageId.get)) + } + onNewMessage(plainMsg) } @@ -273,6 +284,8 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database, database.updateContact(contact) callbacks.onConnectionsChanged() + case mr: MessageReceived => + database.setMessageConfirmed(mr.messageId) case _ => val origin = msg.header.origin if (origin != crypto.localAddress && database.getContact(origin).isEmpty) diff --git a/core/src/main/scala/com/nutomic/ensichat/core/Crypto.scala b/core/src/main/scala/com/nutomic/ensichat/core/Crypto.scala index a6f5be3..7a430dd 100644 --- a/core/src/main/scala/com/nutomic/ensichat/core/Crypto.scala +++ b/core/src/main/scala/com/nutomic/ensichat/core/Crypto.scala @@ -262,6 +262,7 @@ class Crypto(settings: SettingsInterface, keyFolder: File) { val body = msg.header.asInstanceOf[ContentHeader].contentType match { case Text.Type => Text.read(decrypted) case UserInfo.Type => UserInfo.read(decrypted) + case MessageReceived.Type => MessageReceived.read(decrypted) } new Message(msg.header, msg.crypto, body) } diff --git a/core/src/main/scala/com/nutomic/ensichat/core/body/MessageReceived.scala b/core/src/main/scala/com/nutomic/ensichat/core/body/MessageReceived.scala new file mode 100644 index 0000000..11d3696 --- /dev/null +++ b/core/src/main/scala/com/nutomic/ensichat/core/body/MessageReceived.scala @@ -0,0 +1,49 @@ +package com.nutomic.ensichat.core.body + +import java.nio.ByteBuffer + +import com.nutomic.ensichat.core.Message +import com.nutomic.ensichat.core.util.BufferUtils + +import scala.Predef.String + +object MessageReceived { + + val Type = 8 + + /** + * Constructs [[Text]] instance from byte array. + */ + def read(array: Array[Byte]): MessageReceived = { + val b = ByteBuffer.wrap(array) + val messageId = BufferUtils.getUnsignedInt(b) + new MessageReceived(messageId) + } + +} + +/** + * Holds a plain text message. + */ +final case class MessageReceived(messageId: Long) extends MessageBody { + + override def protocolType = -1 + + override def contentType = MessageReceived.Type + + override def write: Array[Byte] = { + val b = ByteBuffer.allocate(length) + // TODO: This should be putUnsignedLong, but doesn't seem possible in the JVM. + // Alternatively, we could use signed ints instead. + BufferUtils.putUnsignedInt(b, messageId) + b.array() + } + + override def length = 4 + + override def equals(a: Any): Boolean = a match { + case o: MessageReceived => messageId == o.messageId + case _ => false + } + +} diff --git a/core/src/main/scala/com/nutomic/ensichat/core/header/AbstractHeader.scala b/core/src/main/scala/com/nutomic/ensichat/core/header/AbstractHeader.scala index 70a4752..ad13467 100644 --- a/core/src/main/scala/com/nutomic/ensichat/core/header/AbstractHeader.scala +++ b/core/src/main/scala/com/nutomic/ensichat/core/header/AbstractHeader.scala @@ -1,10 +1,10 @@ package com.nutomic.ensichat.core.header import java.nio.ByteBuffer -import java.util.Date import com.nutomic.ensichat.core.Address import com.nutomic.ensichat.core.util.BufferUtils +import org.joda.time.DateTime object AbstractHeader { @@ -33,7 +33,7 @@ trait AbstractHeader { def target: Address def seqNum: Int def messageId: Option[Long] = None - def time: Option[Date] = None + def time: Option[DateTime] = None /** * Writes the header to byte array. diff --git a/core/src/main/scala/com/nutomic/ensichat/core/header/ContentHeader.scala b/core/src/main/scala/com/nutomic/ensichat/core/header/ContentHeader.scala index f58ce36..9fa1866 100644 --- a/core/src/main/scala/com/nutomic/ensichat/core/header/ContentHeader.scala +++ b/core/src/main/scala/com/nutomic/ensichat/core/header/ContentHeader.scala @@ -1,10 +1,10 @@ package com.nutomic.ensichat.core.header import java.nio.ByteBuffer -import java.util.Date import com.nutomic.ensichat.core.Address import com.nutomic.ensichat.core.util.BufferUtils +import org.joda.time.DateTime object ContentHeader { @@ -25,7 +25,7 @@ object ContentHeader { val time = BufferUtils.getUnsignedInt(b) val ch = new ContentHeader(mh.origin, mh.target, mh.seqNum, contentType, Some(messageId), - Some(new Date(time * 1000)), mh.tokens, mh.hopCount) + Some(new DateTime(time * 1000)), mh.tokens, mh.hopCount) val remaining = new Array[Byte](b.remaining()) b.get(remaining, 0, b.remaining()) @@ -44,7 +44,7 @@ final case class ContentHeader(override val origin: Address, override val seqNum: Int, contentType: Int, override val messageId: Some[Long], - override val time: Some[Date], + override val time: Some[DateTime], override val tokens: Int, override val hopCount: Int = 0) extends AbstractHeader { @@ -61,7 +61,7 @@ final case class ContentHeader(override val origin: Address, BufferUtils.putUnsignedShort(b, contentType) BufferUtils.putUnsignedInt(b, messageId.get) - BufferUtils.putUnsignedInt(b, time.get.getTime / 1000) + BufferUtils.putUnsignedInt(b, time.get.getMillis / 1000) b.array() } @@ -73,7 +73,7 @@ final case class ContentHeader(override val origin: Address, super.equals(a) && contentType == o.contentType && messageId == o.messageId && - time.get.getTime / 1000 == o.time.get.getTime / 1000 + time.get.getMillis / 1000 == o.time.get.getMillis / 1000 case _ => false } diff --git a/core/src/main/scala/com/nutomic/ensichat/core/util/Database.scala b/core/src/main/scala/com/nutomic/ensichat/core/util/Database.scala index 9833f50..e58f2a4 100644 --- a/core/src/main/scala/com/nutomic/ensichat/core/util/Database.scala +++ b/core/src/main/scala/com/nutomic/ensichat/core/util/Database.scala @@ -2,14 +2,14 @@ package com.nutomic.ensichat.core.util import java.io.File import java.sql.DriverManager -import java.util.Date import com.nutomic.ensichat.core.body.Text import com.nutomic.ensichat.core.header.ContentHeader import com.nutomic.ensichat.core.interfaces.{CallbackInterface, SettingsInterface} -import com.nutomic.ensichat.core.{Address, Message, User} +import com.nutomic.ensichat.core.{Crypto, Address, Message, User} import com.typesafe.scalalogging.Logger import org.joda.time +import org.joda.time.DateTime import slick.driver.H2Driver.api._ import scala.concurrent.Await @@ -30,29 +30,30 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb private val DatabasePath = "jdbc:h2:" + path.getAbsolutePath + ";DATABASE_TO_UPPER=false" - private class Messages(tag: Tag) extends Table[Message](tag, "MESSAGES") { - def id = primaryKey("id", (origin, messageId)) - def origin = column[String]("origin") - def target = column[String]("target") - def messageId = column[Long]("message_id") - def text = column[String]("text") - def date = column[Long]("date") - def tokens = column[Int]("tokens") - def * = (origin, target, messageId, text, date, tokens) <> [Message, (String, String, Long, String, Long, Int)]( { + private class Messages(tag: Tag) extends Table[(Message, Boolean)](tag, "MESSAGES") { + def id = primaryKey("id", (origin, messageId)) + def origin = column[String]("origin") + def target = column[String]("target") + def messageId = column[Long]("message_id") + def text = column[String]("text") + def date = column[Long]("date") + def tokens = column[Int]("tokens") + def confirmedDelivered = column[Boolean]("confirmed_delivered") + def * = (origin, target, messageId, text, date, tokens, confirmedDelivered) <> [(Message, Boolean), (String, String, Long, String, Long, Int, Boolean)]( { tuple => val header = new ContentHeader(new Address(tuple._1), new Address(tuple._2), -1, Text.Type, Some(tuple._3), - Some(new Date(tuple._5)), + Some(new DateTime(tuple._5)), tuple._6) val body = new Text(tuple._4) - new Message(header, body) + (new Message(header, body), tuple._7) }, message => - Option((message.header.origin.toString(), message.header.target.toString(), - message.header.messageId.get, message.body.asInstanceOf[Text].text, - message.header.time.get.getTime, message.header.tokens)) + Option((message._1.header.origin.toString(), message._1.header.target.toString(), + message._1.header.messageId.get, message._1.body.asInstanceOf[Text].text, + message._1.header.time.get.getMillis, message._1.header.tokens, message._2)) ) } private val messages = TableQuery[Messages] @@ -109,15 +110,13 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb connection.createStatement().executeUpdate("ALTER TABLE MESSAGES ADD COLUMN (tokens INT);") connection.commit() Await.result(db.run(knownDevices.schema.create), Duration.Inf) + connection.createStatement().executeUpdate("ALTER TABLE MESSAGES ADD COLUMN (confirmed_delivered INT);") + connection.commit() } connection.close() settings.put(DatabaseVersionKey, DatabaseVersion) } - // Apparently, slick doesn't support ALTER TABLE, so we have to write raw SQL for this... - { - } - def close(): Unit = { Await.result(db.shutdown, Duration.Inf) } @@ -126,7 +125,8 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb * Inserts the given new message into the database. */ def onMessageReceived(msg: Message): Unit = msg.body match { - case _: Text => Await.result(db.run(messages += msg), Duration.Inf) + case _: Text => + Await.result(db.run(messages += (msg, false)), Duration.Inf) case _ => } @@ -134,7 +134,7 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb val query = messages.filter { m => m.origin === address.toString || m.target === address.toString } - Await.result(db.run(query.result), Duration.Inf) + Await.result(db.run(query.result), Duration.Inf).map(_._1) } /** @@ -201,4 +201,24 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb Await.result(db.run(query), Duration.Inf) } + def setMessageConfirmed(messageId: Long): Unit = { + val localAddress = new Address(settings.get(Crypto.LocalAddressKey, "")) + val query = messages.filter { c => + c.origin === localAddress.toString && + c.messageId === messageId + } + .map(_.confirmedDelivered) + .update(true) + Await.result(db.run(query), Duration.Inf) + } + + /** + * Returns all addresses sent by us that have confirmedDlivered = false. + */ + def getUnconfirmedMessages: Set[Message] = { + val localAddress = new Address(settings.get(Crypto.LocalAddressKey, "")) + val query = messages.filter(m => m.origin === localAddress.toString && !m.confirmedDelivered) + Await.result(db.run(query.result), Duration.Inf).map(_._1).toSet + } + } diff --git a/core/src/main/scala/com/nutomic/ensichat/core/util/LocalRoutesInfo.scala b/core/src/main/scala/com/nutomic/ensichat/core/util/LocalRoutesInfo.scala index 4f77fb8..0bb973e 100644 --- a/core/src/main/scala/com/nutomic/ensichat/core/util/LocalRoutesInfo.scala +++ b/core/src/main/scala/com/nutomic/ensichat/core/util/LocalRoutesInfo.scala @@ -11,7 +11,7 @@ private[core] object LocalRoutesInfo { /** * [[RouteStates.Idle]]: - * A route that is known, but has not been used in the last [[ActiveInterval. + * A route that is known, but has not been used in the last [[ActiveInterval]]. * [[RouteStates.Active]]: * A route that is known, and has been used in the last [[ActiveInterval]]. * [[RouteStates.Invalid]]: diff --git a/core/src/main/scala/com/nutomic/ensichat/core/util/MessageBuffer.scala b/core/src/main/scala/com/nutomic/ensichat/core/util/MessageBuffer.scala index 988a405..6b10cad 100644 --- a/core/src/main/scala/com/nutomic/ensichat/core/util/MessageBuffer.scala +++ b/core/src/main/scala/com/nutomic/ensichat/core/util/MessageBuffer.scala @@ -3,12 +3,12 @@ package com.nutomic.ensichat.core.util import java.util.{TimerTask, Timer} import com.nutomic.ensichat.core.{Address, Message} -import org.joda.time.{DateTime, Duration} +import org.joda.time.{Seconds, DateTime, Duration} /** * Contains messages that couldn't be forwarded because we don't know a route. */ -class MessageBuffer(retryMessageSending: (Address) => Unit) { +class MessageBuffer(localAddress: Address, retryMessageSending: (Address) => Unit) { /** * The maximum number of times we retry to deliver a message. @@ -29,7 +29,13 @@ class MessageBuffer(retryMessageSending: (Address) => Unit) { } def addMessage(msg: Message): Unit = { - val newEntry = new BufferEntry(msg, DateTime.now, 0) + // For old messages added back from database, find their retry count from send time and offset. + val retryCount = + (0 to 6).find { i => + msg.header.time.get.plus(calculateNextRetryOffset(i)).isAfter(DateTime.now) + } + .getOrElse(6) + val newEntry = new BufferEntry(msg, DateTime.now, retryCount) values += newEntry retryMessage(newEntry) } @@ -86,7 +92,7 @@ class MessageBuffer(retryMessageSending: (Address) => Unit) { private def handleTimeouts(): Unit = { values = values.filter { e => - e.retryCount < MaxRetryCount + e.retryCount < MaxRetryCount && e.message.header.origin != localAddress } } diff --git a/core/src/test/scala/com/nutomic/ensichat/core/RouterTest.scala b/core/src/test/scala/com/nutomic/ensichat/core/RouterTest.scala index 677626a..8deab2d 100644 --- a/core/src/test/scala/com/nutomic/ensichat/core/RouterTest.scala +++ b/core/src/test/scala/com/nutomic/ensichat/core/RouterTest.scala @@ -1,12 +1,13 @@ package com.nutomic.ensichat.core +import java.util.GregorianCalendar import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.{Date, GregorianCalendar} import com.nutomic.ensichat.core.body.{Text, UserInfo} import com.nutomic.ensichat.core.header.ContentHeader import com.nutomic.ensichat.core.util.LocalRoutesInfo import junit.framework.TestCase +import org.joda.time.DateTime import org.junit.Assert._ class RouterTest extends TestCase { @@ -93,14 +94,14 @@ class RouterTest extends TestCase { def testHopLimit(): Unit = Range(19, 22).foreach { i => val msg = new Message( - new ContentHeader(AddressTest.a1, AddressTest.a2, 1, 1, Some(1), Some(new Date()), 3, i), new Text("")) + new ContentHeader(AddressTest.a1, AddressTest.a2, 1, 1, Some(1), Some(DateTime.now), 3, i), new Text("")) val router = new Router(new LocalRoutesInfo(neighbors), (a, m) => fail(), _ => ()) router.forwardMessage(msg) } private def generateMessage(sender: Address, receiver: Address, seqNum: Int): Message = { val header = new ContentHeader(sender, receiver, seqNum, UserInfo.Type, Some(5), - Some(new GregorianCalendar(2014, 6, 10).getTime), 3) + Some(new DateTime(new GregorianCalendar(2014, 6, 10).getTime)), 3) new Message(header, new UserInfo("", "")) } diff --git a/core/src/test/scala/com/nutomic/ensichat/core/header/ContentHeaderTest.scala b/core/src/test/scala/com/nutomic/ensichat/core/header/ContentHeaderTest.scala index 9717959..3638ff5 100644 --- a/core/src/test/scala/com/nutomic/ensichat/core/header/ContentHeaderTest.scala +++ b/core/src/test/scala/com/nutomic/ensichat/core/header/ContentHeaderTest.scala @@ -5,24 +5,25 @@ import java.util.{Date, GregorianCalendar} import com.nutomic.ensichat.core.body.Text import com.nutomic.ensichat.core.{Address, AddressTest} import junit.framework.TestCase +import org.joda.time.DateTime import org.junit.Assert._ object ContentHeaderTest { val h1 = new ContentHeader(AddressTest.a1, AddressTest.a2, 1234, - Text.Type, Some(123), Some(new GregorianCalendar(1970, 1, 1).getTime), 3) + Text.Type, Some(123), Some(new DateTime(new GregorianCalendar(1970, 1, 1).getTime)), 3) val h2 = new ContentHeader(AddressTest.a1, AddressTest.a3, - 30000, Text.Type, Some(8765), Some(new GregorianCalendar(2014, 6, 10).getTime), 2) + 30000, Text.Type, Some(8765), Some(new DateTime(new GregorianCalendar(2014, 6, 10))), 2) val h3 = new ContentHeader(AddressTest.a4, AddressTest.a2, - 250, Text.Type, Some(77), Some(new GregorianCalendar(2020, 11, 11).getTime), 1) + 250, Text.Type, Some(77), Some(new DateTime(new GregorianCalendar(2020, 11, 11).getTime)), 1) val h4 = new ContentHeader(Address.Null, Address.Broadcast, - ContentHeader.SeqNumRange.last, 0, Some(0xffff), Some(new Date(0L)), 6) + ContentHeader.SeqNumRange.last, 0, Some(0xffff), Some(new DateTime(0L)), 6) val h5 = new ContentHeader(Address.Broadcast, Address.Null, - 0, 0xff, Some(0), Some(new Date(0xffffffffL)), 0) + 0, 0xff, Some(0), Some(new DateTime(0xffffffffL)), 0) val headers = Set(h1, h2, h3, h4, h5) diff --git a/core/src/test/scala/com/nutomic/ensichat/core/util/DatabaseTest.scala b/core/src/test/scala/com/nutomic/ensichat/core/util/DatabaseTest.scala index 3dde962..5ad6458 100644 --- a/core/src/test/scala/com/nutomic/ensichat/core/util/DatabaseTest.scala +++ b/core/src/test/scala/com/nutomic/ensichat/core/util/DatabaseTest.scala @@ -6,11 +6,12 @@ import java.util.concurrent.CountDownLatch import com.nutomic.ensichat.core.body.Text import com.nutomic.ensichat.core.header.ContentHeader -import com.nutomic.ensichat.core.interfaces.{SettingsInterface, CallbackInterface} +import com.nutomic.ensichat.core.interfaces.{CallbackInterface, SettingsInterface} import com.nutomic.ensichat.core.util.DatabaseTest._ import com.nutomic.ensichat.core.{Address, Message, User} import junit.framework.Assert._ import junit.framework.TestCase +import org.joda.time.DateTime object DatabaseTest { @@ -20,11 +21,11 @@ object DatabaseTest { private val a4 = new Address("4444459893F8810C4024CFC951374AABA1F4DE6347A3D7D8E44918AD1FF2BA36") private val h1 = new ContentHeader(a2, a1, -1, Text.Type, Some(123), - Some(new GregorianCalendar(1970, 1, 1).getTime), 0) + Some(new DateTime(new GregorianCalendar(1970, 1, 1).getTime)), 0) private val h2 = new ContentHeader(a1, a3, -1, Text.Type, Some(8765), - Some(new GregorianCalendar(2014, 6, 10).getTime), 0) + Some(new DateTime(new GregorianCalendar(2014, 6, 10).getTime)), 0) private val h3 = new ContentHeader(a4, a2, -1, Text.Type, Some(77), - Some(new GregorianCalendar(2020, 11, 11).getTime), 0) + Some(new DateTime(new GregorianCalendar(2020, 11, 11).getTime)), 0) private val m1 = new Message(h1, new Text("first")) private val m2 = new Message(h2, new Text("second")) diff --git a/core/src/test/scala/com/nutomic/ensichat/core/util/MessageBufferTest.scala b/core/src/test/scala/com/nutomic/ensichat/core/util/MessageBufferTest.scala index 0412bc1..a362f4e 100644 --- a/core/src/test/scala/com/nutomic/ensichat/core/util/MessageBufferTest.scala +++ b/core/src/test/scala/com/nutomic/ensichat/core/util/MessageBufferTest.scala @@ -1,29 +1,41 @@ package com.nutomic.ensichat.core.util -import java.util.concurrent.{TimeUnit, CountDownLatch} +import java.util.concurrent.{CountDownLatch, TimeUnit} -import com.nutomic.ensichat.core.MessageTest +import com.nutomic.ensichat.core.header.ContentHeader +import com.nutomic.ensichat.core.{Message, Address, MessageTest} import junit.framework.TestCase +import org.joda.time.DateTime import org.junit.Assert._ class MessageBufferTest extends TestCase { + /** + * MessageBuffer checks the time of a message, we have to use the current time or items might + * time out. + */ + private def adjustMessageTime(m: Message) = + new Message(m.header.asInstanceOf[ContentHeader].copy(time=Some(DateTime.now)), m.body) + + val m1 = adjustMessageTime(MessageTest.m1) + val m2 = adjustMessageTime(MessageTest.m2) + def testGetMessages(): Unit = { - val buffer = new MessageBuffer(() => _) - buffer.addMessage(MessageTest.m1) - buffer.addMessage(MessageTest.m2) - val msgs = buffer.getMessages(MessageTest.m1.header.target) + val buffer = new MessageBuffer(Address.Null, () => _) + buffer.addMessage(m1) + buffer.addMessage(m2) + val msgs = buffer.getMessages(m1.header.target) assertEquals(1, msgs.size) - assertEquals(MessageTest.m1, msgs.head) + assertEquals(m1, msgs.head) } def testRetryMessage(): Unit = { val latch = new CountDownLatch(1) - val buffer = new MessageBuffer({e => - assertEquals(MessageTest.m1.header.target, e) + val buffer = new MessageBuffer(Address.Null, {e => + assertEquals(m1.header.target, e) latch.countDown() }) - buffer.addMessage(MessageTest.m1) + buffer.addMessage(m1) assertTrue(latch.await(15, TimeUnit.SECONDS)) } diff --git a/integration/src/main/scala/com.nutomic.ensichat.integration/LocalNode.scala b/integration/src/main/scala/com.nutomic.ensichat.integration/LocalNode.scala index 4a59ec9..0c9f166 100644 --- a/integration/src/main/scala/com.nutomic.ensichat.integration/LocalNode.scala +++ b/integration/src/main/scala/com.nutomic.ensichat.integration/LocalNode.scala @@ -50,9 +50,9 @@ class LocalNode(val index: Int, configFolder: File) extends CallbackInterface { override def get[T](key: String, default: T): T = values.get(key).map(_.asInstanceOf[T]).getOrElse(default) override def put[T](key: String, value: T): Unit = values += (key -> value.asInstanceOf[Any]) } - private val database = new Database(databaseFile, settings, this) val crypto = new Crypto(settings, keyFolder) + val database = new Database(databaseFile, settings, this) val connectionHandler = new ConnectionHandler(settings, database, this, crypto, 0, port) val eventQueue = new FifoStream[(EventType.EventType, Option[Message])]() diff --git a/integration/src/main/scala/com.nutomic.ensichat.integration/Main.scala b/integration/src/main/scala/com.nutomic.ensichat.integration/Main.scala index 9c9f348..d761f3f 100644 --- a/integration/src/main/scala/com.nutomic.ensichat.integration/Main.scala +++ b/integration/src/main/scala/com.nutomic.ensichat.integration/Main.scala @@ -1,14 +1,14 @@ package com.nutomic.ensichat.integration import java.io.File -import java.util.{TimerTask, Timer} import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.{Timer, TimerTask} import com.nutomic.ensichat.core.Crypto import com.nutomic.ensichat.core.body.Text import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.{DurationLong, Duration} +import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.util.Try import scalax.file.Path @@ -17,11 +17,12 @@ import scalax.file.Path * Creates some local nodes, connects them and sends messages between them. * * If the test runs slow or fails, changing [[Crypto.PublicKeySize]] to 512 should help. + * + * These tests are somewhat fragile, and might fail randomly. It helps to run only one of + * the test functions at a time. */ object Main extends App { - // NOTE: These tests are somewhat fragile, and might fail randomly. It helps to run only - // one of the following functions at a time. testNeighborSending() testMeshMessageSending() testIndirectRelay() @@ -29,6 +30,7 @@ object Main extends App { testMessageDeliveryOnConnect() testSendDelayed() testRouteChange() + testMessageConfirmation() private def testNeighborSending(): Unit = { val node1 = Await.result(createNode(1), Duration.Inf) @@ -154,6 +156,20 @@ object Main extends App { System.out.println("Test send delayed successful!") } + /** + * Check that message confirmation is sent back after message was received. + */ + private def testMessageConfirmation(): Unit = { + val nodes = createNodes(2) + + connectNodes(nodes(0), nodes(1)) + sendMessage(nodes(0), nodes(1)) + assert(nodes(0).database.getMessages(nodes(1).crypto.localAddress).nonEmpty) + assert(nodes(0).database.getUnconfirmedMessages.isEmpty) + + nodes.foreach(_.stop()) + } + private def createNodes(count: Int): Seq[LocalNode] = { val nodes = Await.result(Future.sequence((0 until count).map(createNode)), Duration.Inf) nodes.foreach(n => System.out.println(s"Node ${n.index} has address ${n.crypto.localAddress}"))