Store total node connection time in database.
This commit is contained in:
parent
23ee0f6da7
commit
1add05b72f
6 changed files with 61 additions and 23 deletions
|
@ -12,6 +12,7 @@ import com.nutomic.ensichat.core.body.ConnectionInfo
|
|||
import com.nutomic.ensichat.core.interfaces.{SettingsInterface, TransmissionInterface}
|
||||
import com.nutomic.ensichat.core.{Address, ConnectionHandler, Message}
|
||||
import com.nutomic.ensichat.service.ChatService
|
||||
import org.joda.time.{DateTime, Duration}
|
||||
|
||||
import scala.collection.immutable.HashMap
|
||||
|
||||
|
@ -169,12 +170,13 @@ class BluetoothInterface(context: Context, mainHandler: Handler,
|
|||
/**
|
||||
* Removes device from active connections.
|
||||
*/
|
||||
def onConnectionClosed(device: Device, socket: BluetoothSocket): Unit = {
|
||||
val address = getAddressForDevice(device.id)
|
||||
devices -= device.id
|
||||
connections -= device.id
|
||||
addressDeviceMap = addressDeviceMap.filterNot(_._2 == device.id)
|
||||
connectionHandler.onConnectionClosed(address)
|
||||
def onConnectionClosed(connectionOpened: DateTime, deviceId: Device.ID): Unit = {
|
||||
val address = getAddressForDevice(deviceId)
|
||||
devices -= deviceId
|
||||
connections -= deviceId
|
||||
addressDeviceMap = addressDeviceMap.filterNot(_._2 == deviceId)
|
||||
val connectionDuration = new Duration(connectionOpened, DateTime.now)
|
||||
connectionHandler.onConnectionClosed(address, connectionDuration)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -9,6 +9,7 @@ import com.nutomic.ensichat.core.Message.ReadMessageException
|
|||
import com.nutomic.ensichat.core.body.ConnectionInfo
|
||||
import com.nutomic.ensichat.core.header.MessageHeader
|
||||
import com.nutomic.ensichat.core.{Address, Crypto, Message}
|
||||
import org.joda.time.DateTime
|
||||
|
||||
/**
|
||||
* Transfers data between connnected devices.
|
||||
|
@ -17,8 +18,11 @@ import com.nutomic.ensichat.core.{Address, Crypto, Message}
|
|||
* @param socket An open socket to the given device.
|
||||
* @param onReceive Called when a message was received from the other device.
|
||||
*/
|
||||
class BluetoothTransferThread(context: Context, device: Device, socket: BluetoothSocket, handler: BluetoothInterface,
|
||||
crypto: Crypto, onReceive: (Message, Device.ID) => Unit) extends Thread {
|
||||
class BluetoothTransferThread(context: Context, device: Device, socket: BluetoothSocket,
|
||||
handler: BluetoothInterface, crypto: Crypto,
|
||||
onReceive: (Message, Device.ID) => Unit) extends Thread {
|
||||
|
||||
private val connectionOpened = DateTime.now
|
||||
|
||||
private val Tag = "TransferThread"
|
||||
|
||||
|
@ -102,7 +106,7 @@ class BluetoothTransferThread(context: Context, device: Device, socket: Bluetoot
|
|||
} catch {
|
||||
case e: IOException => Log.e(Tag, "Failed to close socket", e);
|
||||
} finally {
|
||||
handler.onConnectionClosed(new Device(device.btDevice.get, false), null)
|
||||
handler.onConnectionClosed(connectionOpened, device.id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -320,10 +320,17 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
|
|||
true
|
||||
}
|
||||
|
||||
def onConnectionClosed(address: Address): Unit = {
|
||||
/**
|
||||
* Called by [[TransmissionInterface]] when a connection is closed.
|
||||
*
|
||||
* @param address The address of the connected device.
|
||||
* @param duration The time that we were connected to the device.
|
||||
*/
|
||||
def onConnectionClosed(address: Address, duration: Duration): Unit = {
|
||||
localRoutesInfo.connectionClosed(address)
|
||||
.foreach(routeError(_, None))
|
||||
callbacks.onConnectionsChanged()
|
||||
database.insertOrUpdateKnownDevice(address, duration)
|
||||
}
|
||||
|
||||
def connections(): Set[Address] = transmissionInterfaces.flatMap(_.getConnections)
|
||||
|
|
|
@ -8,6 +8,7 @@ import com.nutomic.ensichat.core.body.ConnectionInfo
|
|||
import com.nutomic.ensichat.core.header.MessageHeader
|
||||
import com.nutomic.ensichat.core.{Address, Crypto, Message}
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.joda.time.DateTime
|
||||
|
||||
/**
|
||||
* Encapsulates an active connection to another node.
|
||||
|
@ -17,6 +18,8 @@ private[core] class InternetConnectionThread(socket: Socket, crypto: Crypto,
|
|||
onReceive: (Message, InternetConnectionThread) => Unit)
|
||||
extends Thread {
|
||||
|
||||
val connectionOpened = DateTime.now
|
||||
|
||||
private val logger = Logger(this.getClass)
|
||||
|
||||
private val inStream: InputStream =
|
||||
|
|
|
@ -7,6 +7,7 @@ import com.nutomic.ensichat.core.interfaces.{SettingsInterface, TransmissionInte
|
|||
import com.nutomic.ensichat.core.util.FutureHelper
|
||||
import com.nutomic.ensichat.core.{Address, ConnectionHandler, Crypto, Message}
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.joda.time.{DateTime, Duration}
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.Future
|
||||
|
@ -102,7 +103,8 @@ private[core] class InternetInterface(connectionHandler: ConnectionHandler, cryp
|
|||
logger.trace("Connection closed to " + ad)
|
||||
connections -= connectionThread
|
||||
addressDeviceMap -= ad
|
||||
connectionHandler.onConnectionClosed(ad)
|
||||
val connectionDuration = new Duration(connectionThread.connectionOpened, DateTime.now)
|
||||
connectionHandler.onConnectionClosed(ad, connectionDuration)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import com.nutomic.ensichat.core.header.ContentHeader
|
|||
import com.nutomic.ensichat.core.interfaces.CallbackInterface
|
||||
import com.nutomic.ensichat.core.{Address, Message, User}
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import org.joda.time
|
||||
import slick.driver.H2Driver.api._
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
@ -30,20 +31,21 @@ class Database(path: File, callbackInterface: CallbackInterface) {
|
|||
def messageId = column[Long]("message_id")
|
||||
def text = column[String]("text")
|
||||
def date = column[Long]("date")
|
||||
def * = (origin, target, messageId, text, date).<> [Message, (String, String, Long, String, Long)]( { tuple =>
|
||||
val header = new ContentHeader(new Address(tuple._1),
|
||||
new Address(tuple._2),
|
||||
-1,
|
||||
Text.Type,
|
||||
Some(tuple._3),
|
||||
Some(new Date(tuple._5)))
|
||||
val body = new Text(tuple._4)
|
||||
new Message(header, body)
|
||||
}, { message =>
|
||||
def * = (origin, target, messageId, text, date) <> [Message, (String, String, Long, String, Long)]( {
|
||||
tuple =>
|
||||
val header = new ContentHeader(new Address(tuple._1),
|
||||
new Address(tuple._2),
|
||||
-1,
|
||||
Text.Type,
|
||||
Some(tuple._3),
|
||||
Some(new Date(tuple._5)))
|
||||
val body = new Text(tuple._4)
|
||||
new Message(header, body)
|
||||
}, message =>
|
||||
Option((message.header.origin.toString(), message.header.target.toString(),
|
||||
message.header.messageId.get, message.body.asInstanceOf[Text].text,
|
||||
message.header.time.get.getTime))
|
||||
})
|
||||
)
|
||||
}
|
||||
private val messages = TableQuery[Messages]
|
||||
|
||||
|
@ -51,11 +53,20 @@ class Database(path: File, callbackInterface: CallbackInterface) {
|
|||
def address = column[String]("address", O.PrimaryKey)
|
||||
def name = column[String]("name")
|
||||
def status = column[String]("status")
|
||||
def wrappedAddress = address.<> [Address, String](new Address(_), a => Option(a.toString()))
|
||||
def wrappedAddress = address <> [Address, String](new Address(_), a => Option(a.toString))
|
||||
def * = (wrappedAddress, name, status) <> (User.tupled, User.unapply)
|
||||
}
|
||||
private val contacts = TableQuery[Contacts]
|
||||
|
||||
private class KnownDevices(tag: Tag) extends Table[(Address, time.Duration)](tag, "KNOWN_DEVICES") {
|
||||
def address = column[String]("address", O.PrimaryKey)
|
||||
def totalConnectionSeconds = column[Long]("total_connection_seconds")
|
||||
def * = (address, totalConnectionSeconds) <> [(Address, time.Duration), (String, Long)](
|
||||
tuple => (new Address(tuple._1), time.Duration.standardSeconds(tuple._2)),
|
||||
tuple => Option((tuple._1.toString, tuple._2.getStandardSeconds)))
|
||||
}
|
||||
private val knownDevices = TableQuery[KnownDevices]
|
||||
|
||||
private val db = Database.forURL("jdbc:h2:" + path.getAbsolutePath, driver = "org.h2.Driver")
|
||||
|
||||
// Create tables if database doesn't exist.
|
||||
|
@ -122,4 +133,13 @@ class Database(path: File, callbackInterface: CallbackInterface) {
|
|||
callbackInterface.onContactsUpdated()
|
||||
}
|
||||
|
||||
def insertOrUpdateKnownDevice(address: Address, connectionTime: time.Duration): Unit = {
|
||||
val query = knownDevices.insertOrUpdate((address, connectionTime))
|
||||
Await.result(db.run(query), Duration.Inf)
|
||||
}
|
||||
|
||||
def getKnownDevices: Seq[(Address, time.Duration)] = {
|
||||
Await.result(db.run(knownDevices.result), Duration.Inf)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Reference in a new issue