Added message confirmations to protocol (ref #22).
This commit is contained in:
parent
338a51fea9
commit
579d1f5717
16 changed files with 207 additions and 75 deletions
13
PROTOCOL.md
13
PROTOCOL.md
|
@ -85,7 +85,7 @@ version, type and ID, followed by the length of the message.
|
|||
0 1 2 3
|
||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
| Version | Protocol-Type | Tokens | Hop Limit |
|
||||
| Version | Protocol-Type | Tokens | Hop Count |
|
||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
| Length |
|
||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
|
@ -327,3 +327,14 @@ Text the string to be transferred, encoded as UTF-8.
|
|||
|
||||
Contains the sender's name and status, which should be used for
|
||||
display to users.
|
||||
|
||||
### MessageReceived (Content-Type = 8)
|
||||
|
||||
0 1 2 3
|
||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
| Message ID |
|
||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
|
||||
Confirms that a previous content message has been received by the
|
||||
target node. Message ID is the ID of that message.
|
||||
|
|
|
@ -32,11 +32,12 @@ class ChatService extends Service {
|
|||
|
||||
private val callbackHandler = new CallbackHandler(this, notificationHandler)
|
||||
|
||||
lazy val database = new Database(getDatabasePath("database"), callbackHandler)
|
||||
private def settingsWrapper = new SettingsWrapper(this)
|
||||
|
||||
lazy val database = new Database(getDatabasePath("database"), settingsWrapper, callbackHandler)
|
||||
|
||||
private lazy val connectionHandler =
|
||||
new ConnectionHandler(new SettingsWrapper(this), database, callbackHandler,
|
||||
ChatService.newCrypto(this), 1)
|
||||
new ConnectionHandler(settingsWrapper, database, callbackHandler, ChatService.newCrypto(this), 1)
|
||||
|
||||
private val networkReceiver = new NetworkChangedReceiver()
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package com.nutomic.ensichat.core
|
||||
|
||||
import java.security.InvalidKeyException
|
||||
import java.util.Date
|
||||
|
||||
import com.nutomic.ensichat.core.body._
|
||||
import com.nutomic.ensichat.core.header.{AbstractHeader, ContentHeader, MessageHeader}
|
||||
|
@ -9,7 +8,7 @@ import com.nutomic.ensichat.core.interfaces._
|
|||
import com.nutomic.ensichat.core.internet.InternetInterface
|
||||
import com.nutomic.ensichat.core.util._
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.joda.time.Duration
|
||||
import org.joda.time.{DateTime, Duration}
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.Future
|
||||
|
@ -39,7 +38,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
(a, m) => transmissionInterfaces.foreach(_.send(a, m)),
|
||||
noRouteFound)
|
||||
|
||||
private val messageBuffer = new MessageBuffer(requestRoute)
|
||||
private lazy val messageBuffer = new MessageBuffer(crypto.localAddress, requestRoute)
|
||||
|
||||
/**
|
||||
* Holds all known users.
|
||||
|
@ -64,6 +63,11 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
transmissionInterfaces +=
|
||||
new InternetInterface(this, crypto, settings, maxInternetConnections, port)
|
||||
transmissionInterfaces.foreach(_.create())
|
||||
database.getUnconfirmedMessages.foreach { m =>
|
||||
val encrypted = crypto.encryptAndSign(m)
|
||||
messageBuffer.addMessage(encrypted)
|
||||
requestRoute(encrypted.header.target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,7 +85,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
FutureHelper {
|
||||
val messageId = settings.get("message_id", 0L)
|
||||
val header = new ContentHeader(crypto.localAddress, target, seqNumGenerator.next(),
|
||||
body.contentType, Some(messageId), Some(new Date()), AbstractHeader.InitialForwardingTokens)
|
||||
body.contentType, Some(messageId), Some(DateTime.now), AbstractHeader.InitialForwardingTokens)
|
||||
settings.put("message_id", messageId + 1)
|
||||
|
||||
val msg = new Message(header, body)
|
||||
|
@ -100,7 +104,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
val header = new MessageHeader(body.protocolType, crypto.localAddress, Address.Broadcast, seqNum, 0)
|
||||
|
||||
val signed = crypto.sign(new Message(header, body))
|
||||
logger.trace(s"sending new $signed")
|
||||
router.forwardMessage(signed)
|
||||
}
|
||||
|
||||
|
@ -110,7 +113,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
val header = new MessageHeader(body.protocolType, crypto.localAddress, replyTo, seqNum, 0)
|
||||
|
||||
val signed = crypto.sign(new Message(header, body))
|
||||
logger.trace(s"sending new $signed")
|
||||
router.forwardMessage(signed)
|
||||
}
|
||||
|
||||
|
@ -122,7 +124,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
val body = new RouteError(address, seqNum)
|
||||
|
||||
val signed = crypto.sign(new Message(header, body))
|
||||
logger.trace(s"sending new $signed")
|
||||
router.forwardMessage(signed)
|
||||
}
|
||||
|
||||
|
@ -149,7 +150,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
|
||||
msg.body match {
|
||||
case rreq: RouteRequest =>
|
||||
logger.trace(s"Received $msg")
|
||||
localRoutesInfo.addRoute(msg.header.origin, rreq.originSeqNum, previousHop, rreq.originMetric)
|
||||
resendMissingRouteMessages()
|
||||
// TODO: Respecting this causes the RERR test to fail. We have to fix the implementation
|
||||
|
@ -172,7 +172,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
}
|
||||
return
|
||||
case rrep: RouteReply =>
|
||||
logger.trace(s"Received $msg")
|
||||
localRoutesInfo.addRoute(msg.header.origin, rrep.originSeqNum, previousHop, 0)
|
||||
// TODO: See above (in RREQ handler).
|
||||
if (routeMessageInfo.isMessageRedundant(msg)) {
|
||||
|
@ -198,7 +197,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
router.forwardMessage(forwardMsg)
|
||||
return
|
||||
case rerr: RouteError =>
|
||||
logger.trace(s"Received $msg")
|
||||
localRoutesInfo.getRoute(rerr.address).foreach { route =>
|
||||
if (route.nextHop == msg.header.origin && (rerr.seqNum == 0 || rerr.seqNum > route.seqNum)) {
|
||||
localRoutesInfo.connectionClosed(rerr.address)
|
||||
|
@ -231,6 +229,19 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
return
|
||||
}
|
||||
|
||||
// This is necessary because a message is sent to the destination and relays seperately,
|
||||
// with different sequence numbers. Because of this, we also have to check the message ID
|
||||
// to avoid duplicate messages.
|
||||
if (database.getMessages(msg.header.origin).exists(m => m.header.origin == plainMsg.header.origin && m.header.messageId == plainMsg.header.messageId)) {
|
||||
logger.trace(s"Received message $msg again, ignoring")
|
||||
return
|
||||
}
|
||||
|
||||
if (plainMsg.body.contentType == Text.Type) {
|
||||
logger.trace(s"Sending confirmation for $plainMsg")
|
||||
sendTo(plainMsg.header.origin, new MessageReceived(plainMsg.header.messageId.get))
|
||||
}
|
||||
|
||||
onNewMessage(plainMsg)
|
||||
}
|
||||
|
||||
|
@ -273,6 +284,8 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
database.updateContact(contact)
|
||||
|
||||
callbacks.onConnectionsChanged()
|
||||
case mr: MessageReceived =>
|
||||
database.setMessageConfirmed(mr.messageId)
|
||||
case _ =>
|
||||
val origin = msg.header.origin
|
||||
if (origin != crypto.localAddress && database.getContact(origin).isEmpty)
|
||||
|
|
|
@ -262,6 +262,7 @@ class Crypto(settings: SettingsInterface, keyFolder: File) {
|
|||
val body = msg.header.asInstanceOf[ContentHeader].contentType match {
|
||||
case Text.Type => Text.read(decrypted)
|
||||
case UserInfo.Type => UserInfo.read(decrypted)
|
||||
case MessageReceived.Type => MessageReceived.read(decrypted)
|
||||
}
|
||||
new Message(msg.header, msg.crypto, body)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package com.nutomic.ensichat.core.body
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import com.nutomic.ensichat.core.Message
|
||||
import com.nutomic.ensichat.core.util.BufferUtils
|
||||
|
||||
import scala.Predef.String
|
||||
|
||||
object MessageReceived {
|
||||
|
||||
val Type = 8
|
||||
|
||||
/**
|
||||
* Constructs [[Text]] instance from byte array.
|
||||
*/
|
||||
def read(array: Array[Byte]): MessageReceived = {
|
||||
val b = ByteBuffer.wrap(array)
|
||||
val messageId = BufferUtils.getUnsignedInt(b)
|
||||
new MessageReceived(messageId)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds a plain text message.
|
||||
*/
|
||||
final case class MessageReceived(messageId: Long) extends MessageBody {
|
||||
|
||||
override def protocolType = -1
|
||||
|
||||
override def contentType = MessageReceived.Type
|
||||
|
||||
override def write: Array[Byte] = {
|
||||
val b = ByteBuffer.allocate(length)
|
||||
// TODO: This should be putUnsignedLong, but doesn't seem possible in the JVM.
|
||||
// Alternatively, we could use signed ints instead.
|
||||
BufferUtils.putUnsignedInt(b, messageId)
|
||||
b.array()
|
||||
}
|
||||
|
||||
override def length = 4
|
||||
|
||||
override def equals(a: Any): Boolean = a match {
|
||||
case o: MessageReceived => messageId == o.messageId
|
||||
case _ => false
|
||||
}
|
||||
|
||||
}
|
|
@ -1,10 +1,10 @@
|
|||
package com.nutomic.ensichat.core.header
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.Date
|
||||
|
||||
import com.nutomic.ensichat.core.Address
|
||||
import com.nutomic.ensichat.core.util.BufferUtils
|
||||
import org.joda.time.DateTime
|
||||
|
||||
object AbstractHeader {
|
||||
|
||||
|
@ -33,7 +33,7 @@ trait AbstractHeader {
|
|||
def target: Address
|
||||
def seqNum: Int
|
||||
def messageId: Option[Long] = None
|
||||
def time: Option[Date] = None
|
||||
def time: Option[DateTime] = None
|
||||
|
||||
/**
|
||||
* Writes the header to byte array.
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package com.nutomic.ensichat.core.header
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.Date
|
||||
|
||||
import com.nutomic.ensichat.core.Address
|
||||
import com.nutomic.ensichat.core.util.BufferUtils
|
||||
import org.joda.time.DateTime
|
||||
|
||||
object ContentHeader {
|
||||
|
||||
|
@ -25,7 +25,7 @@ object ContentHeader {
|
|||
val time = BufferUtils.getUnsignedInt(b)
|
||||
|
||||
val ch = new ContentHeader(mh.origin, mh.target, mh.seqNum, contentType, Some(messageId),
|
||||
Some(new Date(time * 1000)), mh.tokens, mh.hopCount)
|
||||
Some(new DateTime(time * 1000)), mh.tokens, mh.hopCount)
|
||||
|
||||
val remaining = new Array[Byte](b.remaining())
|
||||
b.get(remaining, 0, b.remaining())
|
||||
|
@ -44,7 +44,7 @@ final case class ContentHeader(override val origin: Address,
|
|||
override val seqNum: Int,
|
||||
contentType: Int,
|
||||
override val messageId: Some[Long],
|
||||
override val time: Some[Date],
|
||||
override val time: Some[DateTime],
|
||||
override val tokens: Int,
|
||||
override val hopCount: Int = 0)
|
||||
extends AbstractHeader {
|
||||
|
@ -61,7 +61,7 @@ final case class ContentHeader(override val origin: Address,
|
|||
|
||||
BufferUtils.putUnsignedShort(b, contentType)
|
||||
BufferUtils.putUnsignedInt(b, messageId.get)
|
||||
BufferUtils.putUnsignedInt(b, time.get.getTime / 1000)
|
||||
BufferUtils.putUnsignedInt(b, time.get.getMillis / 1000)
|
||||
|
||||
b.array()
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ final case class ContentHeader(override val origin: Address,
|
|||
super.equals(a) &&
|
||||
contentType == o.contentType &&
|
||||
messageId == o.messageId &&
|
||||
time.get.getTime / 1000 == o.time.get.getTime / 1000
|
||||
time.get.getMillis / 1000 == o.time.get.getMillis / 1000
|
||||
case _ => false
|
||||
}
|
||||
|
||||
|
|
|
@ -2,14 +2,14 @@ package com.nutomic.ensichat.core.util
|
|||
|
||||
import java.io.File
|
||||
import java.sql.DriverManager
|
||||
import java.util.Date
|
||||
|
||||
import com.nutomic.ensichat.core.body.Text
|
||||
import com.nutomic.ensichat.core.header.ContentHeader
|
||||
import com.nutomic.ensichat.core.interfaces.{CallbackInterface, SettingsInterface}
|
||||
import com.nutomic.ensichat.core.{Address, Message, User}
|
||||
import com.nutomic.ensichat.core.{Crypto, Address, Message, User}
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.joda.time
|
||||
import org.joda.time.DateTime
|
||||
import slick.driver.H2Driver.api._
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
@ -30,29 +30,30 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
|
|||
|
||||
private val DatabasePath = "jdbc:h2:" + path.getAbsolutePath + ";DATABASE_TO_UPPER=false"
|
||||
|
||||
private class Messages(tag: Tag) extends Table[Message](tag, "MESSAGES") {
|
||||
def id = primaryKey("id", (origin, messageId))
|
||||
def origin = column[String]("origin")
|
||||
def target = column[String]("target")
|
||||
def messageId = column[Long]("message_id")
|
||||
def text = column[String]("text")
|
||||
def date = column[Long]("date")
|
||||
def tokens = column[Int]("tokens")
|
||||
def * = (origin, target, messageId, text, date, tokens) <> [Message, (String, String, Long, String, Long, Int)]( {
|
||||
private class Messages(tag: Tag) extends Table[(Message, Boolean)](tag, "MESSAGES") {
|
||||
def id = primaryKey("id", (origin, messageId))
|
||||
def origin = column[String]("origin")
|
||||
def target = column[String]("target")
|
||||
def messageId = column[Long]("message_id")
|
||||
def text = column[String]("text")
|
||||
def date = column[Long]("date")
|
||||
def tokens = column[Int]("tokens")
|
||||
def confirmedDelivered = column[Boolean]("confirmed_delivered")
|
||||
def * = (origin, target, messageId, text, date, tokens, confirmedDelivered) <> [(Message, Boolean), (String, String, Long, String, Long, Int, Boolean)]( {
|
||||
tuple =>
|
||||
val header = new ContentHeader(new Address(tuple._1),
|
||||
new Address(tuple._2),
|
||||
-1,
|
||||
Text.Type,
|
||||
Some(tuple._3),
|
||||
Some(new Date(tuple._5)),
|
||||
Some(new DateTime(tuple._5)),
|
||||
tuple._6)
|
||||
val body = new Text(tuple._4)
|
||||
new Message(header, body)
|
||||
(new Message(header, body), tuple._7)
|
||||
}, message =>
|
||||
Option((message.header.origin.toString(), message.header.target.toString(),
|
||||
message.header.messageId.get, message.body.asInstanceOf[Text].text,
|
||||
message.header.time.get.getTime, message.header.tokens))
|
||||
Option((message._1.header.origin.toString(), message._1.header.target.toString(),
|
||||
message._1.header.messageId.get, message._1.body.asInstanceOf[Text].text,
|
||||
message._1.header.time.get.getMillis, message._1.header.tokens, message._2))
|
||||
)
|
||||
}
|
||||
private val messages = TableQuery[Messages]
|
||||
|
@ -109,15 +110,13 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
|
|||
connection.createStatement().executeUpdate("ALTER TABLE MESSAGES ADD COLUMN (tokens INT);")
|
||||
connection.commit()
|
||||
Await.result(db.run(knownDevices.schema.create), Duration.Inf)
|
||||
connection.createStatement().executeUpdate("ALTER TABLE MESSAGES ADD COLUMN (confirmed_delivered INT);")
|
||||
connection.commit()
|
||||
}
|
||||
connection.close()
|
||||
settings.put(DatabaseVersionKey, DatabaseVersion)
|
||||
}
|
||||
|
||||
// Apparently, slick doesn't support ALTER TABLE, so we have to write raw SQL for this...
|
||||
{
|
||||
}
|
||||
|
||||
def close(): Unit = {
|
||||
Await.result(db.shutdown, Duration.Inf)
|
||||
}
|
||||
|
@ -126,7 +125,8 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
|
|||
* Inserts the given new message into the database.
|
||||
*/
|
||||
def onMessageReceived(msg: Message): Unit = msg.body match {
|
||||
case _: Text => Await.result(db.run(messages += msg), Duration.Inf)
|
||||
case _: Text =>
|
||||
Await.result(db.run(messages += (msg, false)), Duration.Inf)
|
||||
case _ =>
|
||||
}
|
||||
|
||||
|
@ -134,7 +134,7 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
|
|||
val query = messages.filter { m =>
|
||||
m.origin === address.toString || m.target === address.toString
|
||||
}
|
||||
Await.result(db.run(query.result), Duration.Inf)
|
||||
Await.result(db.run(query.result), Duration.Inf).map(_._1)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -201,4 +201,24 @@ class Database(path: File, settings: SettingsInterface, callbackInterface: Callb
|
|||
Await.result(db.run(query), Duration.Inf)
|
||||
}
|
||||
|
||||
def setMessageConfirmed(messageId: Long): Unit = {
|
||||
val localAddress = new Address(settings.get(Crypto.LocalAddressKey, ""))
|
||||
val query = messages.filter { c =>
|
||||
c.origin === localAddress.toString &&
|
||||
c.messageId === messageId
|
||||
}
|
||||
.map(_.confirmedDelivered)
|
||||
.update(true)
|
||||
Await.result(db.run(query), Duration.Inf)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all addresses sent by us that have confirmedDlivered = false.
|
||||
*/
|
||||
def getUnconfirmedMessages: Set[Message] = {
|
||||
val localAddress = new Address(settings.get(Crypto.LocalAddressKey, ""))
|
||||
val query = messages.filter(m => m.origin === localAddress.toString && !m.confirmedDelivered)
|
||||
Await.result(db.run(query.result), Duration.Inf).map(_._1).toSet
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ private[core] object LocalRoutesInfo {
|
|||
|
||||
/**
|
||||
* [[RouteStates.Idle]]:
|
||||
* A route that is known, but has not been used in the last [[ActiveInterval.
|
||||
* A route that is known, but has not been used in the last [[ActiveInterval]].
|
||||
* [[RouteStates.Active]]:
|
||||
* A route that is known, and has been used in the last [[ActiveInterval]].
|
||||
* [[RouteStates.Invalid]]:
|
||||
|
|
|
@ -3,12 +3,12 @@ package com.nutomic.ensichat.core.util
|
|||
import java.util.{TimerTask, Timer}
|
||||
|
||||
import com.nutomic.ensichat.core.{Address, Message}
|
||||
import org.joda.time.{DateTime, Duration}
|
||||
import org.joda.time.{Seconds, DateTime, Duration}
|
||||
|
||||
/**
|
||||
* Contains messages that couldn't be forwarded because we don't know a route.
|
||||
*/
|
||||
class MessageBuffer(retryMessageSending: (Address) => Unit) {
|
||||
class MessageBuffer(localAddress: Address, retryMessageSending: (Address) => Unit) {
|
||||
|
||||
/**
|
||||
* The maximum number of times we retry to deliver a message.
|
||||
|
@ -29,7 +29,13 @@ class MessageBuffer(retryMessageSending: (Address) => Unit) {
|
|||
}
|
||||
|
||||
def addMessage(msg: Message): Unit = {
|
||||
val newEntry = new BufferEntry(msg, DateTime.now, 0)
|
||||
// For old messages added back from database, find their retry count from send time and offset.
|
||||
val retryCount =
|
||||
(0 to 6).find { i =>
|
||||
msg.header.time.get.plus(calculateNextRetryOffset(i)).isAfter(DateTime.now)
|
||||
}
|
||||
.getOrElse(6)
|
||||
val newEntry = new BufferEntry(msg, DateTime.now, retryCount)
|
||||
values += newEntry
|
||||
retryMessage(newEntry)
|
||||
}
|
||||
|
@ -86,7 +92,7 @@ class MessageBuffer(retryMessageSending: (Address) => Unit) {
|
|||
|
||||
private def handleTimeouts(): Unit = {
|
||||
values = values.filter { e =>
|
||||
e.retryCount < MaxRetryCount
|
||||
e.retryCount < MaxRetryCount && e.message.header.origin != localAddress
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
package com.nutomic.ensichat.core
|
||||
|
||||
import java.util.GregorianCalendar
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import java.util.{Date, GregorianCalendar}
|
||||
|
||||
import com.nutomic.ensichat.core.body.{Text, UserInfo}
|
||||
import com.nutomic.ensichat.core.header.ContentHeader
|
||||
import com.nutomic.ensichat.core.util.LocalRoutesInfo
|
||||
import junit.framework.TestCase
|
||||
import org.joda.time.DateTime
|
||||
import org.junit.Assert._
|
||||
|
||||
class RouterTest extends TestCase {
|
||||
|
@ -93,14 +94,14 @@ class RouterTest extends TestCase {
|
|||
|
||||
def testHopLimit(): Unit = Range(19, 22).foreach { i =>
|
||||
val msg = new Message(
|
||||
new ContentHeader(AddressTest.a1, AddressTest.a2, 1, 1, Some(1), Some(new Date()), 3, i), new Text(""))
|
||||
new ContentHeader(AddressTest.a1, AddressTest.a2, 1, 1, Some(1), Some(DateTime.now), 3, i), new Text(""))
|
||||
val router = new Router(new LocalRoutesInfo(neighbors), (a, m) => fail(), _ => ())
|
||||
router.forwardMessage(msg)
|
||||
}
|
||||
|
||||
private def generateMessage(sender: Address, receiver: Address, seqNum: Int): Message = {
|
||||
val header = new ContentHeader(sender, receiver, seqNum, UserInfo.Type, Some(5),
|
||||
Some(new GregorianCalendar(2014, 6, 10).getTime), 3)
|
||||
Some(new DateTime(new GregorianCalendar(2014, 6, 10).getTime)), 3)
|
||||
new Message(header, new UserInfo("", ""))
|
||||
}
|
||||
|
||||
|
|
|
@ -5,24 +5,25 @@ import java.util.{Date, GregorianCalendar}
|
|||
import com.nutomic.ensichat.core.body.Text
|
||||
import com.nutomic.ensichat.core.{Address, AddressTest}
|
||||
import junit.framework.TestCase
|
||||
import org.joda.time.DateTime
|
||||
import org.junit.Assert._
|
||||
|
||||
object ContentHeaderTest {
|
||||
|
||||
val h1 = new ContentHeader(AddressTest.a1, AddressTest.a2, 1234,
|
||||
Text.Type, Some(123), Some(new GregorianCalendar(1970, 1, 1).getTime), 3)
|
||||
Text.Type, Some(123), Some(new DateTime(new GregorianCalendar(1970, 1, 1).getTime)), 3)
|
||||
|
||||
val h2 = new ContentHeader(AddressTest.a1, AddressTest.a3,
|
||||
30000, Text.Type, Some(8765), Some(new GregorianCalendar(2014, 6, 10).getTime), 2)
|
||||
30000, Text.Type, Some(8765), Some(new DateTime(new GregorianCalendar(2014, 6, 10))), 2)
|
||||
|
||||
val h3 = new ContentHeader(AddressTest.a4, AddressTest.a2,
|
||||
250, Text.Type, Some(77), Some(new GregorianCalendar(2020, 11, 11).getTime), 1)
|
||||
250, Text.Type, Some(77), Some(new DateTime(new GregorianCalendar(2020, 11, 11).getTime)), 1)
|
||||
|
||||
val h4 = new ContentHeader(Address.Null, Address.Broadcast,
|
||||
ContentHeader.SeqNumRange.last, 0, Some(0xffff), Some(new Date(0L)), 6)
|
||||
ContentHeader.SeqNumRange.last, 0, Some(0xffff), Some(new DateTime(0L)), 6)
|
||||
|
||||
val h5 = new ContentHeader(Address.Broadcast, Address.Null,
|
||||
0, 0xff, Some(0), Some(new Date(0xffffffffL)), 0)
|
||||
0, 0xff, Some(0), Some(new DateTime(0xffffffffL)), 0)
|
||||
|
||||
val headers = Set(h1, h2, h3, h4, h5)
|
||||
|
||||
|
|
|
@ -6,11 +6,12 @@ import java.util.concurrent.CountDownLatch
|
|||
|
||||
import com.nutomic.ensichat.core.body.Text
|
||||
import com.nutomic.ensichat.core.header.ContentHeader
|
||||
import com.nutomic.ensichat.core.interfaces.{SettingsInterface, CallbackInterface}
|
||||
import com.nutomic.ensichat.core.interfaces.{CallbackInterface, SettingsInterface}
|
||||
import com.nutomic.ensichat.core.util.DatabaseTest._
|
||||
import com.nutomic.ensichat.core.{Address, Message, User}
|
||||
import junit.framework.Assert._
|
||||
import junit.framework.TestCase
|
||||
import org.joda.time.DateTime
|
||||
|
||||
object DatabaseTest {
|
||||
|
||||
|
@ -20,11 +21,11 @@ object DatabaseTest {
|
|||
private val a4 = new Address("4444459893F8810C4024CFC951374AABA1F4DE6347A3D7D8E44918AD1FF2BA36")
|
||||
|
||||
private val h1 = new ContentHeader(a2, a1, -1, Text.Type, Some(123),
|
||||
Some(new GregorianCalendar(1970, 1, 1).getTime), 0)
|
||||
Some(new DateTime(new GregorianCalendar(1970, 1, 1).getTime)), 0)
|
||||
private val h2 = new ContentHeader(a1, a3, -1, Text.Type, Some(8765),
|
||||
Some(new GregorianCalendar(2014, 6, 10).getTime), 0)
|
||||
Some(new DateTime(new GregorianCalendar(2014, 6, 10).getTime)), 0)
|
||||
private val h3 = new ContentHeader(a4, a2, -1, Text.Type, Some(77),
|
||||
Some(new GregorianCalendar(2020, 11, 11).getTime), 0)
|
||||
Some(new DateTime(new GregorianCalendar(2020, 11, 11).getTime)), 0)
|
||||
|
||||
private val m1 = new Message(h1, new Text("first"))
|
||||
private val m2 = new Message(h2, new Text("second"))
|
||||
|
|
|
@ -1,29 +1,41 @@
|
|||
package com.nutomic.ensichat.core.util
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
import com.nutomic.ensichat.core.MessageTest
|
||||
import com.nutomic.ensichat.core.header.ContentHeader
|
||||
import com.nutomic.ensichat.core.{Message, Address, MessageTest}
|
||||
import junit.framework.TestCase
|
||||
import org.joda.time.DateTime
|
||||
import org.junit.Assert._
|
||||
|
||||
class MessageBufferTest extends TestCase {
|
||||
|
||||
/**
|
||||
* MessageBuffer checks the time of a message, we have to use the current time or items might
|
||||
* time out.
|
||||
*/
|
||||
private def adjustMessageTime(m: Message) =
|
||||
new Message(m.header.asInstanceOf[ContentHeader].copy(time=Some(DateTime.now)), m.body)
|
||||
|
||||
val m1 = adjustMessageTime(MessageTest.m1)
|
||||
val m2 = adjustMessageTime(MessageTest.m2)
|
||||
|
||||
def testGetMessages(): Unit = {
|
||||
val buffer = new MessageBuffer(() => _)
|
||||
buffer.addMessage(MessageTest.m1)
|
||||
buffer.addMessage(MessageTest.m2)
|
||||
val msgs = buffer.getMessages(MessageTest.m1.header.target)
|
||||
val buffer = new MessageBuffer(Address.Null, () => _)
|
||||
buffer.addMessage(m1)
|
||||
buffer.addMessage(m2)
|
||||
val msgs = buffer.getMessages(m1.header.target)
|
||||
assertEquals(1, msgs.size)
|
||||
assertEquals(MessageTest.m1, msgs.head)
|
||||
assertEquals(m1, msgs.head)
|
||||
}
|
||||
|
||||
def testRetryMessage(): Unit = {
|
||||
val latch = new CountDownLatch(1)
|
||||
val buffer = new MessageBuffer({e =>
|
||||
assertEquals(MessageTest.m1.header.target, e)
|
||||
val buffer = new MessageBuffer(Address.Null, {e =>
|
||||
assertEquals(m1.header.target, e)
|
||||
latch.countDown()
|
||||
})
|
||||
buffer.addMessage(MessageTest.m1)
|
||||
buffer.addMessage(m1)
|
||||
assertTrue(latch.await(15, TimeUnit.SECONDS))
|
||||
}
|
||||
|
||||
|
|
|
@ -50,9 +50,9 @@ class LocalNode(val index: Int, configFolder: File) extends CallbackInterface {
|
|||
override def get[T](key: String, default: T): T = values.get(key).map(_.asInstanceOf[T]).getOrElse(default)
|
||||
override def put[T](key: String, value: T): Unit = values += (key -> value.asInstanceOf[Any])
|
||||
}
|
||||
private val database = new Database(databaseFile, settings, this)
|
||||
|
||||
val crypto = new Crypto(settings, keyFolder)
|
||||
val database = new Database(databaseFile, settings, this)
|
||||
val connectionHandler = new ConnectionHandler(settings, database, this, crypto, 0, port)
|
||||
val eventQueue = new FifoStream[(EventType.EventType, Option[Message])]()
|
||||
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
package com.nutomic.ensichat.integration
|
||||
|
||||
import java.io.File
|
||||
import java.util.{TimerTask, Timer}
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import java.util.{Timer, TimerTask}
|
||||
|
||||
import com.nutomic.ensichat.core.Crypto
|
||||
import com.nutomic.ensichat.core.body.Text
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.duration.{DurationLong, Duration}
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.{Await, Future}
|
||||
import scala.util.Try
|
||||
import scalax.file.Path
|
||||
|
@ -17,11 +17,12 @@ import scalax.file.Path
|
|||
* Creates some local nodes, connects them and sends messages between them.
|
||||
*
|
||||
* If the test runs slow or fails, changing [[Crypto.PublicKeySize]] to 512 should help.
|
||||
*
|
||||
* These tests are somewhat fragile, and might fail randomly. It helps to run only one of
|
||||
* the test functions at a time.
|
||||
*/
|
||||
object Main extends App {
|
||||
|
||||
// NOTE: These tests are somewhat fragile, and might fail randomly. It helps to run only
|
||||
// one of the following functions at a time.
|
||||
testNeighborSending()
|
||||
testMeshMessageSending()
|
||||
testIndirectRelay()
|
||||
|
@ -29,6 +30,7 @@ object Main extends App {
|
|||
testMessageDeliveryOnConnect()
|
||||
testSendDelayed()
|
||||
testRouteChange()
|
||||
testMessageConfirmation()
|
||||
|
||||
private def testNeighborSending(): Unit = {
|
||||
val node1 = Await.result(createNode(1), Duration.Inf)
|
||||
|
@ -154,6 +156,20 @@ object Main extends App {
|
|||
System.out.println("Test send delayed successful!")
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that message confirmation is sent back after message was received.
|
||||
*/
|
||||
private def testMessageConfirmation(): Unit = {
|
||||
val nodes = createNodes(2)
|
||||
|
||||
connectNodes(nodes(0), nodes(1))
|
||||
sendMessage(nodes(0), nodes(1))
|
||||
assert(nodes(0).database.getMessages(nodes(1).crypto.localAddress).nonEmpty)
|
||||
assert(nodes(0).database.getUnconfirmedMessages.isEmpty)
|
||||
|
||||
nodes.foreach(_.stop())
|
||||
}
|
||||
|
||||
private def createNodes(count: Int): Seq[LocalNode] = {
|
||||
val nodes = Await.result(Future.sequence((0 until count).map(createNode)), Duration.Inf)
|
||||
nodes.foreach(n => System.out.println(s"Node ${n.index} has address ${n.crypto.localAddress}"))
|
||||
|
|
Reference in a new issue