From 7cfd1fd4f15fad77e4209820ea05ea9b7e7b82a5 Mon Sep 17 00:00:00 2001 From: YERMLV Date: Tue, 13 Nov 2018 16:52:47 +0200 Subject: [PATCH 1/3] Added route for state downloading --- build.sbt | 3 +- src/main/resources/application.conf | 1 + src/main/scala/mvp2/http/Routes.scala | 38 ++++++++++++++++++++---- src/main/scala/mvp2/utils/Settings.scala | 2 +- 4 files changed, 37 insertions(+), 7 deletions(-) diff --git a/build.sbt b/build.sbt index 6ffb5c1..adc010a 100644 --- a/build.sbt +++ b/build.sbt @@ -29,7 +29,8 @@ libraryDependencies ++= Seq( "ch.qos.logback" % "logback-core" % logbackVersion, "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test, "org.scalatest" %% "scalatest" % "3.0.5" % Test, - "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test + "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test, + "com.github.pathikrit" %% "better-files" % "3.6.0" ) resolvers ++= Seq("Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/", diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 24c5cf4..1a4d318 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -12,6 +12,7 @@ httpHost = "0.0.0.0" httpPort = 9151 timeout = 10 + enableStateDownload = false } ntp { diff --git a/src/main/scala/mvp2/http/Routes.scala b/src/main/scala/mvp2/http/Routes.scala index d9b368d..e40c9d6 100644 --- a/src/main/scala/mvp2/http/Routes.scala +++ b/src/main/scala/mvp2/http/Routes.scala @@ -1,26 +1,30 @@ package mvp2.http +import java.nio.file.Path import akka.http.scaladsl.server.Directives.complete import akka.actor.ActorSelection -import akka.http.scaladsl.model.{ContentTypes, HttpEntity} +import akka.http.scaladsl.model._ import mvp2.data.{LightKeyBlock, Transaction} import mvp2.messages.{CurrentBlockchainInfo, Get, GetLightChain} import mvp2.utils.Settings import akka.actor.ActorRefFactory -import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.StatusCodes.InternalServerError import akka.http.scaladsl.server.Route import io.circe.Json -import scala.concurrent.Future import io.circe.generic.auto._ import io.circe.syntax._ import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ +import scala.concurrent.Future import scala.language.postfixOps import akka.http.scaladsl.server.Directives._ import akka.pattern.ask import akka.util.{ByteString, Timeout} +import better.files.File +import better.files._ import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport -import mvp2.utils.EncodingUtils._ + +import scala.util.{Failure, Success, Try} case class Routes(settings: Settings, implicit val context: ActorRefFactory) extends FailFastCirceSupport { @@ -29,7 +33,11 @@ case class Routes(settings: Settings, implicit val context: ActorRefFactory) ext implicit val timeout: Timeout = Timeout(settings.apiSettings.timeout.second) implicit val ec: ExecutionContextExecutor = context.dispatcher - val route: Route = getTxs ~ apiInfo ~ chainInfo + val routes: Seq[Route] = + if (settings.apiSettings.enableStateDownload) Seq(getTxs, apiInfo, chainInfo, downloadState) + else Seq(getTxs, apiInfo, chainInfo) + + val route: Route = routes.reduce(_ ~ _) val publisher: ActorSelection = context.actorSelection("/user/starter/blockchainer/publisher") val informator: ActorSelection = context.actorSelection("/user/starter/informator") @@ -56,4 +64,24 @@ case class Routes(settings: Settings, implicit val context: ActorRefFactory) ext def chainInfo: Route = path("chainInfo")( toJsonResponse((informator ? GetLightChain).mapTo[List[LightKeyBlock]].map(_.asJson)) ) + + def downloadState: Route = (path("download") & get)( + createZip match { + case Success(path) => getFromFile(path.toFile, MediaTypes.`application/zip`) + case Failure(th) => + th.printStackTrace() + complete(HttpResponse(InternalServerError)) + } + ) + + private def createZip: Try[Path] = Try { + file"./state.zip".delete(true) + file"./tmp/".delete(true) + val dir: File = file"./tmp/".createDirectory() + File("./leveldb").list.map(_.copyToDirectory(dir)) + val zip: File = file"./state.zip".deleteOnExit() + dir.zipTo(file"./state.zip").deleteOnExit() + zip.path + } + } \ No newline at end of file diff --git a/src/main/scala/mvp2/utils/Settings.scala b/src/main/scala/mvp2/utils/Settings.scala index ad9b502..25fc3e2 100644 --- a/src/main/scala/mvp2/utils/Settings.scala +++ b/src/main/scala/mvp2/utils/Settings.scala @@ -17,7 +17,7 @@ case class Settings(port: Int, case class Node(host: String, port: Int) -case class ApiSettings(httpHost: String, httpPort: Int, timeout: Int) +case class ApiSettings(httpHost: String, httpPort: Int, timeout: Int, enableStateDownload: Boolean) case class InfluxSettings(host: String, port: Int, login: String, password: String) From 80032190c71fe3a7d4e5dcf473f0637d61459377 Mon Sep 17 00:00:00 2001 From: YERMLV Date: Wed, 14 Nov 2018 15:19:50 +0200 Subject: [PATCH 2/3] Added actor for state downloading --- project/plugins.sbt | 0 src/main/scala/mvp2/actors/Downloader.scala | 57 +++++++++++++++++++++ src/main/scala/mvp2/actors/Starter.scala | 6 ++- src/main/scala/mvp2/http/Routes.scala | 6 +-- src/main/scala/mvp2/utils/Settings.scala | 1 + 5 files changed, 65 insertions(+), 5 deletions(-) create mode 100644 project/plugins.sbt create mode 100644 src/main/scala/mvp2/actors/Downloader.scala diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..e69de29 diff --git a/src/main/scala/mvp2/actors/Downloader.scala b/src/main/scala/mvp2/actors/Downloader.scala new file mode 100644 index 0000000..9fedc1b --- /dev/null +++ b/src/main/scala/mvp2/actors/Downloader.scala @@ -0,0 +1,57 @@ +package mvp2.actors + +import java.nio.file.Paths +import akka.pattern.pipe +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} +import akka.stream.scaladsl.FileIO +import better.files.File +import better.files._ +import scala.util.{Failure, Success} +import mvp2.MVP2._ +import scala.concurrent.ExecutionContext.Implicits.global + +class Downloader(downloadFrom: String) extends CommonActor { + override def specialBehavior: Receive = ??? + + override def preStart(): Unit = self ! downloadFrom + + override def receive: Receive = { + case address: String => + pipe(Http(context.system).singleRequest(HttpRequest(uri = s"http://$address/download"))).to(self) + case HttpResponse(StatusCodes.OK, _, entity, _) => + entity.dataBytes.runWith(FileIO.toPath(Paths.get(s"./state.zip"))).onComplete { + case Success(_) => + unzip() + println(1) + context.parent ! DownloadComplete + case Failure(th) => + th.printStackTrace() + context.parent ! DownloadComplete + context.stop(self) + } + case resp @ HttpResponse(code, _, _, _) => + println(code) + resp.discardEntityBytes() + context.parent ! DownloadComplete + context.stop(self) + } + + def unzip(): Unit = { + def moveAllFiles(from: File, dest: File): Unit = from.list.foreach(_.copyToDirectory(dest)) + val zip: File = file"./encry.zip" + val unzipped: File = zip.unzipTo(file"./state") + zip.delete(true) + file"./leveldb/journal".delete(true) + file"./leveldb/snapshots".delete(true) + val unzippedJournal: File = file"./state/journal" + val unzippedSnapshots: File = file"./state/snapshots" + val journalDir: File = file"./leveldb/journal".createDirectories() + val snapshotsDir: File = file"./leveldb/snapshots".createDirectories() + moveAllFiles(unzippedJournal, journalDir) + moveAllFiles(unzippedSnapshots, snapshotsDir) + unzipped.delete(true) + } +} + +case object DownloadComplete diff --git a/src/main/scala/mvp2/actors/Starter.scala b/src/main/scala/mvp2/actors/Starter.scala index 7aecf17..c6709cc 100644 --- a/src/main/scala/mvp2/actors/Starter.scala +++ b/src/main/scala/mvp2/actors/Starter.scala @@ -16,11 +16,15 @@ class Starter extends CommonActor { override def preStart(): Unit = { logger.info("Starting the Starter!") - bornKids() + settings.downloadStateFrom match { + case None => bornKids() + case Some(address) => context.actorOf(Props(classOf[Downloader], address), "downloader") + } } override def specialBehavior: Receive = { case message: InfoMessage => logger.info(message.info) + case DownloadComplete if settings.downloadStateFrom.nonEmpty => bornKids() } def bornKids(): Unit = { diff --git a/src/main/scala/mvp2/http/Routes.scala b/src/main/scala/mvp2/http/Routes.scala index e40c9d6..ee2a36f 100644 --- a/src/main/scala/mvp2/http/Routes.scala +++ b/src/main/scala/mvp2/http/Routes.scala @@ -23,8 +23,8 @@ import akka.util.{ByteString, Timeout} import better.files.File import better.files._ import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport - import scala.util.{Failure, Success, Try} +import mvp2.utils.EncodingUtils._ case class Routes(settings: Settings, implicit val context: ActorRefFactory) extends FailFastCirceSupport { @@ -68,9 +68,7 @@ case class Routes(settings: Settings, implicit val context: ActorRefFactory) ext def downloadState: Route = (path("download") & get)( createZip match { case Success(path) => getFromFile(path.toFile, MediaTypes.`application/zip`) - case Failure(th) => - th.printStackTrace() - complete(HttpResponse(InternalServerError)) + case Failure(_) => complete(HttpResponse(InternalServerError)) } ) diff --git a/src/main/scala/mvp2/utils/Settings.scala b/src/main/scala/mvp2/utils/Settings.scala index 25fc3e2..8cb5ac6 100644 --- a/src/main/scala/mvp2/utils/Settings.scala +++ b/src/main/scala/mvp2/utils/Settings.scala @@ -9,6 +9,7 @@ case class Settings(port: Int, blockPeriod: Long, biasForBlockPeriod: Long, newBlockchain: Boolean, + downloadStateFrom: Option[String] = None, apiSettings: ApiSettings, ntp: NetworkTimeProviderSettings, influx: Option[InfluxSettings], From abb04eafe0bb73013e7e0104100d14ca00a62913 Mon Sep 17 00:00:00 2001 From: YERMLV Date: Wed, 14 Nov 2018 15:23:14 +0200 Subject: [PATCH 3/3] Small refactoring --- src/main/scala/mvp2/actors/Downloader.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/mvp2/actors/Downloader.scala b/src/main/scala/mvp2/actors/Downloader.scala index 9fedc1b..3c4ccd5 100644 --- a/src/main/scala/mvp2/actors/Downloader.scala +++ b/src/main/scala/mvp2/actors/Downloader.scala @@ -14,7 +14,10 @@ import scala.concurrent.ExecutionContext.Implicits.global class Downloader(downloadFrom: String) extends CommonActor { override def specialBehavior: Receive = ??? - override def preStart(): Unit = self ! downloadFrom + override def preStart(): Unit = { + logger.info("Starting downloader") + self ! downloadFrom + } override def receive: Receive = { case address: String =>