From 2643f9a49d8fc6945fa39f2c34ef4f6bef202be5 Mon Sep 17 00:00:00 2001 From: aleksandr Date: Thu, 28 Nov 2019 15:24:19 +0300 Subject: [PATCH 1/6] add ModifierWithBytes, change modsCache --- .../DownloadedModifiersValidator.scala | 18 ++++++---- .../scala/encry/view/ModifiersCache.scala | 33 ++++++++++--------- .../scala/encry/view/NodeViewHolder.scala | 14 ++++---- 3 files changed, 37 insertions(+), 28 deletions(-) diff --git a/src/main/scala/encry/network/DownloadedModifiersValidator.scala b/src/main/scala/encry/network/DownloadedModifiersValidator.scala index 1c45f2cdde..b6c7c965a2 100644 --- a/src/main/scala/encry/network/DownloadedModifiersValidator.scala +++ b/src/main/scala/encry/network/DownloadedModifiersValidator.scala @@ -1,13 +1,13 @@ package encry.network import TransactionProto.TransactionProtoMessage -import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props } -import akka.dispatch.{ PriorityGenerator, UnboundedStablePriorityMailbox } +import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props} +import akka.dispatch.{PriorityGenerator, UnboundedStablePriorityMailbox} import com.typesafe.config.Config import com.typesafe.scalalogging.StrictLogging import encry.modifiers.history.HeaderUtils import encry.network.BlackList.BanReason._ -import encry.network.DownloadedModifiersValidator.{ InvalidModifier, ModifiersForValidating } +import encry.network.DownloadedModifiersValidator.{InvalidModifier, ModifierWithBytes, ModifiersForValidating} import encry.network.NodeViewSynchronizer.ReceivableMessages.UpdatedHistory import encry.network.PeerConnectionHandler.ConnectedPeer import encry.network.PeersKeeper.BanPeer @@ -16,9 +16,11 @@ import encry.stats.StatsSender.ValidatedModifierFromNetwork import encry.view.NodeViewHolder.ReceivableMessages.ModifierFromRemote import encry.view.history.History import encry.view.mempool.MemoryPool.NewTransaction -import org.encryfoundation.common.modifiers.mempool.transaction.{ Transaction, TransactionProtoSerializer } -import org.encryfoundation.common.utils.TaggedTypes.{ ModifierId, ModifierTypeId } -import scala.util.{ Failure, Success, Try } +import org.encryfoundation.common.modifiers.PersistentModifier +import org.encryfoundation.common.modifiers.mempool.transaction.{Transaction, TransactionProtoSerializer} +import org.encryfoundation.common.utils.TaggedTypes.{ModifierId, ModifierTypeId} + +import scala.util.{Failure, Success, Try} class DownloadedModifiersValidator(modifierIdSize: Int, nodeViewHolder: ActorRef, @@ -50,7 +52,7 @@ class DownloadedModifiersValidator(modifierIdSize: Int, s"Sending validated modifier to NodeViewHolder" ) influxRef.foreach(_ ! ValidatedModifierFromNetwork(typeId)) - nodeViewHolder ! ModifierFromRemote(modifier) + nodeViewHolder ! ModifierFromRemote(ModifierWithBytes(modifier, bytes)) } else { logger.info( s"Modifier with id: ${modifier.encodedId} of type: $typeId invalid cause of:" + @@ -97,6 +99,8 @@ class DownloadedModifiersValidator(modifierIdSize: Int, object DownloadedModifiersValidator { + final case class ModifierWithBytes(modifier: PersistentModifier, bytes: Array[Byte]) + final case class ModifiersForValidating(remote: ConnectedPeer, typeId: ModifierTypeId, modifiers: Map[ModifierId, Array[Byte]]) diff --git a/src/main/scala/encry/view/ModifiersCache.scala b/src/main/scala/encry/view/ModifiersCache.scala index a4f7f21cc0..df2f5b866f 100644 --- a/src/main/scala/encry/view/ModifiersCache.scala +++ b/src/main/scala/encry/view/ModifiersCache.scala @@ -7,17 +7,19 @@ import org.encryfoundation.common.modifiers.PersistentModifier import org.encryfoundation.common.modifiers.history.Header import org.encryfoundation.common.utils.Algos import org.encryfoundation.common.utils.TaggedTypes.ModifierId + import scala.annotation.tailrec import scala.collection.immutable.SortedMap import scala.collection.concurrent.TrieMap import scala.collection.mutable import encry.EncryApp.settings +import encry.network.DownloadedModifiersValidator.ModifierWithBytes object ModifiersCache extends StrictLogging { private type Key = mutable.WrappedArray[Byte] - val cache: TrieMap[Key, PersistentModifier] = TrieMap[Key, PersistentModifier]() + val cache: TrieMap[Key, ModifierWithBytes] = TrieMap[Key, ModifierWithBytes]() private var headersCollection: SortedMap[Int, List[ModifierId]] = SortedMap[Int, List[ModifierId]]() private var isChainSynced = false @@ -30,10 +32,10 @@ object ModifiersCache extends StrictLogging { def contains(key: Key): Boolean = cache.contains(key) - def put(key: Key, value: PersistentModifier, history: History): Unit = if (!contains(key)) { - logger.debug(s"Put ${value.encodedId} of type ${value.modifierTypeId} to cache.") + def put(key: Key, value: ModifierWithBytes, history: History): Unit = if (!contains(key)) { + logger.debug(s"Put ${value.modifier.encodedId} of type ${value.modifier.modifierTypeId} to cache.") cache.put(key, value) - value match { + value.modifier match { case header: Header => val possibleHeadersAtCurrentHeight: List[ModifierId] = headersCollection.getOrElse(header.height, List()) logger.debug(s"possibleHeadersAtCurrentHeight(${header.height}): ${possibleHeadersAtCurrentHeight.map(Algos.encode).mkString(",")}") @@ -43,20 +45,20 @@ object ModifiersCache extends StrictLogging { case _ => } - if (size > history.settings.node.modifiersCacheSize) cache.find { case (_, modifier) => - history.testApplicable(modifier) match { + if (size > history.settings.node.modifiersCacheSize) cache.find { case (_, modifierWithBytes) => + history.testApplicable(modifierWithBytes.modifier) match { case Right(_) | Left(_: NonFatalValidationError) => false case _ => true } }.map(mod => remove(mod._1)) } - def remove(key: Key): Option[PersistentModifier] = { + def remove(key: Key): Option[ModifierWithBytes] = { logger.debug(s"Going to delete ${Algos.encode(key.toArray)}. Cache contains: ${cache.get(key).isDefined}.") cache.remove(key) } - def popCandidate(history: History): List[PersistentModifier] = synchronized { + def popCandidate(history: History): List[ModifierWithBytes] = synchronized { findCandidateKey(history).flatMap(k => remove(k)) } @@ -64,10 +66,11 @@ object ModifiersCache extends StrictLogging { def findCandidateKey(history: History): List[Key] = { - def isApplicable(key: Key): Boolean = cache.get(key).exists(modifier => history.testApplicable(modifier) match { - case Left(_: FatalValidationError) => remove(key); false - case Right(_) => true - case Left(_) => false + def isApplicable(key: Key): Boolean = cache.get(key) + .exists(modifierWithBytes => history.testApplicable(modifierWithBytes.modifier) match { + case Left(_: FatalValidationError) => remove(key); false + case Right(_) => true + case Left(_) => false }) def getHeadersKeysAtHeight(height: Int): List[Key] = { @@ -90,8 +93,8 @@ object ModifiersCache extends StrictLogging { }.toList def exhaustiveSearch: List[Key] = List(cache.find { case (k, v) => - v match { - case _: Header if history.getBestHeaderId.exists(headerId => headerId sameElements v.parentId) => true + v.modifier match { + case _: Header if history.getBestHeaderId.exists(headerId => headerId sameElements v.modifier.parentId) => true case _ => val isApplicableMod: Boolean = isApplicable(k) logger.debug(s"Try to apply: ${Algos.encode(k.toArray)} and result is: $isApplicableMod") @@ -113,7 +116,7 @@ object ModifiersCache extends StrictLogging { logger.debug(s"HeadersCollection size is: ${headersCollection.size}") logger.debug(s"Drop height ${history.getBestHeaderHeight + 1} in HeadersCollection") val res = value.map(cache.get(_)).collect { - case Some(v: Header) + case Some(ModifierWithBytes(v: Header, _)) if ((v.parentId sameElements history.getBestHeaderId.getOrElse(Array.emptyByteArray)) || (history.getBestHeaderHeight == history.settings.constants.PreGenesisHeight && (v.parentId sameElements Header.GenesisParentId) diff --git a/src/main/scala/encry/view/NodeViewHolder.scala b/src/main/scala/encry/view/NodeViewHolder.scala index 465e89621b..a408ddf0c2 100644 --- a/src/main/scala/encry/view/NodeViewHolder.scala +++ b/src/main/scala/encry/view/NodeViewHolder.scala @@ -13,6 +13,7 @@ import encry.EncryApp import encry.EncryApp.{miner, nodeViewSynchronizer, timeProvider} import encry.consensus.HistoryConsensus.ProgressInfo import encry.network.DeliveryManager.FullBlockChainIsSynced +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.network.NodeViewSynchronizer.ReceivableMessages._ import encry.network.PeerConnectionHandler.ConnectedPeer import encry.settings.{EncryAppSettings, LevelDBSettings} @@ -123,13 +124,14 @@ class NodeViewHolder(memoryPoolRef: ActorRef, nodeViewSynchronizer ! FastSyncDone context.become(defaultMessages(true)) } - case ModifierFromRemote(mod) => - val isInHistory: Boolean = nodeView.history.isModifierDefined(mod.id) - val isInCache: Boolean = ModifiersCache.contains(key(mod.id)) + case ModifierFromRemote(modifierWithBytes) => + val isInHistory: Boolean = nodeView.history.isModifierDefined(modifierWithBytes.modifier.id) + val isInCache: Boolean = ModifiersCache.contains(key(modifierWithBytes.modifier.id)) if (isInHistory || isInCache) - logger.info(s"Received modifier of type: ${mod.modifierTypeId} ${Algos.encode(mod.id)} " + + logger.info(s"Received modifier of type: ${modifierWithBytes.modifier.modifierTypeId} " + + s" ${Algos.encode(modifierWithBytes.modifier.id)} " + s"can't be placed into cache cause of: inCache: ${!isInCache}.") - else ModifiersCache.put(key(mod.id), mod, nodeView.history) + else ModifiersCache.put(key(modifierWithBytes.modifier.id), modifierWithBytes, nodeView.history) computeApplications() case lm: LocallyGeneratedModifier => @@ -485,7 +487,7 @@ object NodeViewHolder { case class CompareViews(source: ConnectedPeer, modifierTypeId: ModifierTypeId, modifierIds: Seq[ModifierId]) - final case class ModifierFromRemote(serializedModifiers: PersistentModifier) extends AnyVal + final case class ModifierFromRemote(serializedModifiers: ModifierWithBytes) extends AnyVal case class LocallyGeneratedModifier(pmod: PersistentModifier) From 5a4ce1853a34bdbe30bbd68846de78f9de9ed57f Mon Sep 17 00:00:00 2001 From: aleksandr Date: Fri, 29 Nov 2019 12:08:48 +0300 Subject: [PATCH 2/6] change nvh to work with modifierWithBytes --- .../scala/encry/view/NodeViewHolder.scala | 41 ++++++++++--------- .../scala/encry/view/history/History.scala | 4 +- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/main/scala/encry/view/NodeViewHolder.scala b/src/main/scala/encry/view/NodeViewHolder.scala index a408ddf0c2..883a5666a9 100644 --- a/src/main/scala/encry/view/NodeViewHolder.scala +++ b/src/main/scala/encry/view/NodeViewHolder.scala @@ -140,10 +140,11 @@ class NodeViewHolder(memoryPoolRef: ActorRef, logger.info(s"Got locally generated modifier ${lm.pmod.encodedId} of type ${lm.pmod.modifierTypeId}") lm.pmod match { case block: Block => - pmodModify(block.header, isLocallyGenerated = true) - pmodModify(block.payload, isLocallyGenerated = true) + + pmodModify(ModifierWithBytes(block.header, block.header.bytes), isLocallyGenerated = true) + pmodModify(ModifierWithBytes(block.payload, block.header.bytes), isLocallyGenerated = true) case anyMod => - pmodModify(anyMod, isLocallyGenerated = true) + pmodModify(ModifierWithBytes(anyMod, anyMod.bytes), isLocallyGenerated = true) } logger.debug(s"Time processing of msg LocallyGeneratedModifier with mod of type ${lm.pmod.modifierTypeId}:" + s" with id: ${Algos.encode(lm.pmod.id)} -> ${System.currentTimeMillis() - startTime}") @@ -182,7 +183,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, def computeApplications(): Unit = { val mods = ModifiersCache.popCandidate(nodeView.history) if (mods.nonEmpty) { - logger.info(s"mods: ${mods.map(mod => Algos.encode(mod.id))}") + logger.info(s"mods: ${mods.map(mod => Algos.encode(mod.modifier.id))}") mods.foreach(mod => pmodModify(mod)) computeApplications() } @@ -308,26 +309,26 @@ class NodeViewHolder(memoryPoolRef: ActorRef, } } - def pmodModify(pmod: PersistentModifier, isLocallyGenerated: Boolean = false): Unit = - if (!nodeView.history.isModifierDefined(pmod.id)) { - logger.debug(s"\nStarting to apply modifier ${pmod.encodedId} of type ${pmod.modifierTypeId} on nodeViewHolder to history.") + def pmodModify(pmod: ModifierWithBytes, isLocallyGenerated: Boolean = false): Unit = + if (!nodeView.history.isModifierDefined(pmod.modifier.id)) { + logger.debug(s"\nStarting to apply modifier ${pmod.modifier.encodedId} of type ${pmod.modifier.modifierTypeId} on nodeViewHolder to history.") val startAppHistory = System.currentTimeMillis() if (settings.influxDB.isDefined) context.system .actorSelection("user/statsSender") ! - StartApplyingModifier(pmod.id, pmod.modifierTypeId, System.currentTimeMillis()) + StartApplyingModifier(pmod.modifier.id, pmod.modifier.modifierTypeId, System.currentTimeMillis()) nodeView.history.append(pmod) match { case Right((historyBeforeStUpdate, progressInfo)) => - logger.info(s"Successfully applied modifier ${pmod.encodedId} of type ${pmod.modifierTypeId} on nodeViewHolder to history.") - logger.debug(s"Time of applying to history SUCCESS is: ${System.currentTimeMillis() - startAppHistory}. modId is: ${pmod.encodedId}") + logger.info(s"Successfully applied modifier ${pmod.modifier.encodedId} of type ${pmod.modifier.modifierTypeId} on nodeViewHolder to history.") + logger.debug(s"Time of applying to history SUCCESS is: ${System.currentTimeMillis() - startAppHistory}. modId is: ${pmod.modifier.encodedId}") influxRef.foreach { ref => - ref ! EndOfApplyingModifier(pmod.id) - val isHeader: Boolean = pmod match { + ref ! EndOfApplyingModifier(pmod.modifier.id) + val isHeader: Boolean = pmod.modifier match { case _: Header => true case _: Payload => false } ref ! ModifierAppendedToHistory(isHeader, success = true) } - logger.info(s"Going to apply modifications ${pmod.encodedId} of type ${pmod.modifierTypeId} on nodeViewHolder to the state: $progressInfo") + logger.info(s"Going to apply modifications ${pmod.modifier.encodedId} of type ${pmod.modifier.modifierTypeId} on nodeViewHolder to the state: $progressInfo") if (progressInfo.toApply.nonEmpty) { logger.info(s"\n progress info non empty. To apply: ${progressInfo.toApply.map(mod => Algos.encode(mod.id))}") val startPoint: Long = System.currentTimeMillis() @@ -342,7 +343,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, if (settings.influxDB.isDefined) context.actorSelection("/user/statsSender") ! StateUpdating(System.currentTimeMillis() - startPoint) influxRef.foreach { ref => - val isBlock: Boolean = pmod match { + val isBlock: Boolean = pmod.modifier match { case _: Payload => true case _ => false } @@ -352,7 +353,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, if (progressInfo.chainSwitchingNeeded) nodeView.wallet.rollback(VersionTag !@@ progressInfo.branchPoint.get).get blocksApplied.foreach(nodeView.wallet.scanPersistent) - logger.debug(s"\nPersistent modifier ${pmod.encodedId} applied successfully") + logger.debug(s"\nPersistent modifier ${pmod.modifier.encodedId} applied successfully") if (settings.influxDB.isDefined) newHistory.getBestHeader.foreach(header => context.actorSelection("/user/statsSender") ! BestHeaderInChain(header)) if (newHistory.isFullChainSynced) { @@ -362,17 +363,17 @@ class NodeViewHolder(memoryPoolRef: ActorRef, } updateNodeView(Some(newHistory), Some(newState), Some(nodeView.wallet)) } else { - if (!isLocallyGenerated) requestDownloads(progressInfo, Some(pmod.id)) - context.system.eventStream.publish(SemanticallySuccessfulModifier(pmod)) + if (!isLocallyGenerated) requestDownloads(progressInfo, Some(pmod.modifier.id)) + context.system.eventStream.publish(SemanticallySuccessfulModifier(pmod.modifier)) logger.info(s"\nProgress info is empty") updateNodeView(updatedHistory = Some(historyBeforeStUpdate)) } case Left(e) => - logger.debug(s"\nCan`t apply persistent modifier (id: ${pmod.encodedId}, contents: $pmod)" + + logger.debug(s"\nCan`t apply persistent modifier (id: ${pmod.modifier.encodedId}, contents: $pmod)" + s" to history caused $e") - context.system.eventStream.publish(SyntacticallyFailedModification(pmod, List(HistoryApplyError(e.getMessage)))) + context.system.eventStream.publish(SyntacticallyFailedModification(pmod.modifier, List(HistoryApplyError(e.getMessage)))) } - } else logger.info(s"\nTrying to apply modifier ${pmod.encodedId} that's already in history.") + } else logger.info(s"\nTrying to apply modifier ${pmod.modifier.encodedId} that's already in history.") def sendUpdatedInfoToMemoryPool(toRemove: Seq[PersistentModifier]): Unit = { val rolledBackTxs: IndexedSeq[Transaction] = toRemove diff --git a/src/main/scala/encry/view/history/History.scala b/src/main/scala/encry/view/history/History.scala index 95b170c6c5..2e51709466 100644 --- a/src/main/scala/encry/view/history/History.scala +++ b/src/main/scala/encry/view/history/History.scala @@ -1,6 +1,7 @@ package encry.view.history import java.io.File + import com.typesafe.scalalogging.StrictLogging import encry.consensus.HistoryConsensus.ProgressInfo import encry.settings._ @@ -17,6 +18,7 @@ import org.encryfoundation.common.utils.Algos import org.encryfoundation.common.utils.TaggedTypes.ModifierId import org.iq80.leveldb.Options import cats.syntax.either._ +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import supertagged.@@ /** @@ -27,7 +29,7 @@ trait History extends HistoryModifiersValidator with HistoryModifiersProcessors var isFullChainSynced: Boolean = settings.node.offlineGeneration /** Appends modifier to the history if it is applicable. */ - def append(modifier: PersistentModifier): Either[Throwable, (History, ProgressInfo)] = { + def append(modifier: ModifierWithBytes): Either[Throwable, (History, ProgressInfo)] = { logger.info(s"Trying to append modifier ${Algos.encode(modifier.id)} of type ${modifier.modifierTypeId} to history") Either.catchNonFatal(modifier match { case header: Header => From d6295f59ca756f8d23bd845f00aef998e6c6d706 Mon Sep 17 00:00:00 2001 From: aleksandr Date: Fri, 29 Nov 2019 14:06:10 +0300 Subject: [PATCH 3/6] change history traits --- .../test/scala/benches/HistoryBenches.scala | 9 +++-- .../DownloadedModifiersValidator.scala | 5 +++ .../scala/encry/view/NodeViewHolder.scala | 28 +++---------- .../scala/encry/view/history/History.scala | 10 ++--- .../history/HistoryModifiersProcessors.scala | 39 ++++++++++--------- .../view/history/storage/HistoryStorage.scala | 15 ++++--- .../encry/modifiers/InstanceFactory.scala | 27 ++++++------- .../DeliveryManagerTests/DMUtils.scala | 7 +++- ...DeliveryManagerReRequestModifiesSpec.scala | 5 ++- .../DownloadedModifiersValidatorTests.scala | 32 +++++++-------- .../BasicNetworkMessagesProtoTest.scala | 5 ++- .../HistoryComparisionResultTest.scala | 21 +++++----- .../history/ModifiersValidationTest.scala | 11 +++--- 13 files changed, 105 insertions(+), 109 deletions(-) diff --git a/benchmarks/src/test/scala/benches/HistoryBenches.scala b/benchmarks/src/test/scala/benches/HistoryBenches.scala index 1107a902ba..21fe140fa1 100644 --- a/benchmarks/src/test/scala/benches/HistoryBenches.scala +++ b/benchmarks/src/test/scala/benches/HistoryBenches.scala @@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit import benches.HistoryBenches.HistoryBenchState import benches.Utils._ +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.view.history.History import encryBenchmark.BenchSettings import org.encryfoundation.common.modifiers.history.Block @@ -21,8 +22,8 @@ class HistoryBenches { bh.consume { val history: History = generateHistory(benchStateHistory.settings, getRandomTempDir) benchStateHistory.blocks.foldLeft(history) { case (historyL, block) => - historyL.append(block.header) - historyL.append(block.payload) + historyL.append(ModifierWithBytes(block.header)) + historyL.append(ModifierWithBytes(block.payload)) historyL.reportModifierIsValid(block) } history.closeStorage() @@ -70,8 +71,8 @@ object HistoryBenches extends BenchSettings { case ((prevHistory, prevBlock, vector), _) => val block: Block = generateNextBlockValidForHistory(prevHistory, 0, prevBlock, Seq(coinbaseTransaction(0))) - prevHistory.append(block.header) - prevHistory.append(block.payload) + prevHistory.append(ModifierWithBytes(block.header)) + prevHistory.append(ModifierWithBytes(block.payload)) (prevHistory.reportModifierIsValid(block), Some(block), vector :+ block) } resultedHistory._1.closeStorage() diff --git a/src/main/scala/encry/network/DownloadedModifiersValidator.scala b/src/main/scala/encry/network/DownloadedModifiersValidator.scala index b6c7c965a2..72a59fe1d9 100644 --- a/src/main/scala/encry/network/DownloadedModifiersValidator.scala +++ b/src/main/scala/encry/network/DownloadedModifiersValidator.scala @@ -17,6 +17,7 @@ import encry.view.NodeViewHolder.ReceivableMessages.ModifierFromRemote import encry.view.history.History import encry.view.mempool.MemoryPool.NewTransaction import org.encryfoundation.common.modifiers.PersistentModifier +import org.encryfoundation.common.modifiers.history.HistoryModifiersProtoSerializer import org.encryfoundation.common.modifiers.mempool.transaction.{Transaction, TransactionProtoSerializer} import org.encryfoundation.common.utils.TaggedTypes.{ModifierId, ModifierTypeId} @@ -100,6 +101,10 @@ class DownloadedModifiersValidator(modifierIdSize: Int, object DownloadedModifiersValidator { final case class ModifierWithBytes(modifier: PersistentModifier, bytes: Array[Byte]) + + object ModifierWithBytes { + def apply(modifier: PersistentModifier): ModifierWithBytes = new ModifierWithBytes(modifier, HistoryModifiersProtoSerializer.toProto(modifier)) + } final case class ModifiersForValidating(remote: ConnectedPeer, typeId: ModifierTypeId, diff --git a/src/main/scala/encry/view/NodeViewHolder.scala b/src/main/scala/encry/view/NodeViewHolder.scala index 883a5666a9..ac4a75041a 100644 --- a/src/main/scala/encry/view/NodeViewHolder.scala +++ b/src/main/scala/encry/view/NodeViewHolder.scala @@ -1,9 +1,6 @@ package encry.view import java.io.File -import java.nio.file.{Path, SimpleFileVisitor} - -import encry.view.state.avlTree.utils.implicits.Instances._ import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props} import akka.dispatch.{PriorityGenerator, UnboundedStablePriorityMailbox} import akka.pattern._ @@ -16,37 +13,25 @@ import encry.network.DeliveryManager.FullBlockChainIsSynced import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.network.NodeViewSynchronizer.ReceivableMessages._ import encry.network.PeerConnectionHandler.ConnectedPeer -import encry.settings.{EncryAppSettings, LevelDBSettings} -import encry.stats.StatsSender._ -import encry.storage.VersionalStorage -import encry.storage.iodb.versionalIODB.IODBWrapper -import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} import encry.settings.EncryAppSettings import encry.stats.StatsSender._ import encry.utils.CoreTaggedTypes.VersionTag import encry.view.NodeViewErrors.ModifierApplyError.HistoryApplyError import encry.view.NodeViewHolder.ReceivableMessages._ import encry.view.NodeViewHolder._ -import encry.view.fast.sync.SnapshotProcessor -import encry.view.fast.sync.SnapshotHolder.{FastSyncDone, FastSyncFinished, HeaderChainIsSynced, RequiredManifestHeightAndId, SnapshotChunk, TreeChunks} +import encry.view.fast.sync.SnapshotHolder._ import encry.view.history.History import encry.view.mempool.MemoryPool.RolledBackTransactions -import encry.view.state.{UtxoState, _} +import encry.view.state.UtxoState import encry.view.state.avlTree.AvlTree import encry.view.wallet.EncryWallet -import io.iohk.iodb.{ByteArrayWrapper, LSMStore} -import encry.view.history.History -import encry.view.mempool.MemoryPool.RolledBackTransactions -import encry.view.state._ -import encry.view.wallet.EncryWallet +import io.iohk.iodb.ByteArrayWrapper import org.apache.commons.io.FileUtils import org.encryfoundation.common.modifiers.PersistentModifier import org.encryfoundation.common.modifiers.history._ import org.encryfoundation.common.modifiers.mempool.transaction.Transaction import org.encryfoundation.common.utils.Algos import org.encryfoundation.common.utils.TaggedTypes.{ADDigest, ModifierId, ModifierTypeId} -import org.iq80.leveldb.Options - import scala.collection.{IndexedSeq, Seq, mutable} import scala.concurrent.duration._ import scala.concurrent.{ExecutionContextExecutor, Future} @@ -140,11 +125,10 @@ class NodeViewHolder(memoryPoolRef: ActorRef, logger.info(s"Got locally generated modifier ${lm.pmod.encodedId} of type ${lm.pmod.modifierTypeId}") lm.pmod match { case block: Block => - - pmodModify(ModifierWithBytes(block.header, block.header.bytes), isLocallyGenerated = true) - pmodModify(ModifierWithBytes(block.payload, block.header.bytes), isLocallyGenerated = true) + pmodModify(ModifierWithBytes(block.header), isLocallyGenerated = true) + pmodModify(ModifierWithBytes(block.payload), isLocallyGenerated = true) case anyMod => - pmodModify(ModifierWithBytes(anyMod, anyMod.bytes), isLocallyGenerated = true) + pmodModify(ModifierWithBytes(anyMod), isLocallyGenerated = true) } logger.debug(s"Time processing of msg LocallyGeneratedModifier with mod of type ${lm.pmod.modifierTypeId}:" + s" with id: ${Algos.encode(lm.pmod.id)} -> ${System.currentTimeMillis() - startTime}") diff --git a/src/main/scala/encry/view/history/History.scala b/src/main/scala/encry/view/history/History.scala index 2e51709466..5bde943518 100644 --- a/src/main/scala/encry/view/history/History.scala +++ b/src/main/scala/encry/view/history/History.scala @@ -29,13 +29,13 @@ trait History extends HistoryModifiersValidator with HistoryModifiersProcessors var isFullChainSynced: Boolean = settings.node.offlineGeneration /** Appends modifier to the history if it is applicable. */ - def append(modifier: ModifierWithBytes): Either[Throwable, (History, ProgressInfo)] = { - logger.info(s"Trying to append modifier ${Algos.encode(modifier.id)} of type ${modifier.modifierTypeId} to history") - Either.catchNonFatal(modifier match { + def append(modWithBytes: ModifierWithBytes): Either[Throwable, (History, ProgressInfo)] = { + logger.info(s"Trying to append modifier ${Algos.encode(modWithBytes.modifier.id)} of type ${modWithBytes.modifier.modifierTypeId} to history") + Either.catchNonFatal(modWithBytes.modifier match { case header: Header => logger.info(s"Append header ${header.encodedId} at height ${header.height} to history") - (this, processHeader(header)) - case payload: Payload => (this, processPayload(payload)) + (this, processHeader(header, modWithBytes.bytes)) + case payload: Payload => (this, processPayload(payload, modWithBytes.bytes)) }) } diff --git a/src/main/scala/encry/view/history/HistoryModifiersProcessors.scala b/src/main/scala/encry/view/history/HistoryModifiersProcessors.scala index 8fceae6efe..9364dc8e1e 100644 --- a/src/main/scala/encry/view/history/HistoryModifiersProcessors.scala +++ b/src/main/scala/encry/view/history/HistoryModifiersProcessors.scala @@ -17,9 +17,9 @@ import org.encryfoundation.common.utils.Algos trait HistoryModifiersProcessors extends HistoryApi { - def processHeader(h: Header): ProgressInfo = getHeaderInfoUpdate(h) match { + def processHeader(h: Header, headerBytes: Array[Byte]): ProgressInfo = getHeaderInfoUpdate(h) match { case dataToUpdate: Seq[_] if dataToUpdate.nonEmpty => - historyStorage.bulkInsert(h.id, dataToUpdate, Seq(h)) + historyStorage.bulkInsert(h.id, dataToUpdate, Seq(h -> headerBytes)) getBestHeaderId match { case Some(bestHeaderId) => ProgressInfo(none, Seq.empty, if (!bestHeaderId.sameElements(h.id)) Seq.empty else Seq(h), toDownload(h)) @@ -29,24 +29,24 @@ trait HistoryModifiersProcessors extends HistoryApi { case _ => ProgressInfo(none, Seq.empty, Seq.empty, none) } - def processPayload(payload: Payload): ProgressInfo = getBlockByPayload(payload) + def processPayload(payload: Payload, payloadBytes: Array[Byte]): ProgressInfo = getBlockByPayload(payload) .flatMap{block => logger.info(s"proc block ${block.header.encodedId}!") - processBlock(block).some + processBlock(block, payloadBytes).some } - .getOrElse(putToHistory(payload)) + .getOrElse(putToHistory(payload, payloadBytes)) - private def processBlock(blockToProcess: Block): ProgressInfo = { + private def processBlock(blockToProcess: Block, payloadBytes: Array[Byte]): ProgressInfo = { logger.info(s"Starting processing block to history ||${blockToProcess.encodedId}||${blockToProcess.header.height}||") val bestFullChain: Seq[Block] = calculateBestFullChain(blockToProcess) addBlockToCacheIfNecessary(blockToProcess) bestFullChain.lastOption.map(_.header) match { case Some(header) if isValidFirstBlock(blockToProcess.header) => - processValidFirstBlock(blockToProcess, header, bestFullChain) + processValidFirstBlock(blockToProcess, header, bestFullChain, payloadBytes) case Some(header) if isBestBlockDefined && isBetterChain(header.id) => - processBetterChain(blockToProcess, header, Seq.empty, settings.node.blocksToKeep) + processBetterChain(blockToProcess, payloadBytes, header, Seq.empty, settings.node.blocksToKeep) case Some(_) => - nonBestBlock(blockToProcess) + nonBestBlock(blockToProcess, payloadBytes) case None => logger.debug(s"Best full chain is empty. Returning empty progress info") ProgressInfo(none, Seq.empty, Seq.empty, none) @@ -55,13 +55,15 @@ trait HistoryModifiersProcessors extends HistoryApi { private def processValidFirstBlock(fullBlock: Block, newBestHeader: Header, - newBestChain: Seq[Block]): ProgressInfo = { + newBestChain: Seq[Block], + payloadBytes: Array[Byte]): ProgressInfo = { logger.info(s"Appending ${fullBlock.encodedId} as a valid first block with height ${fullBlock.header.height}") - updateStorage(fullBlock.payload, newBestHeader.id) + updateStorage(fullBlock.payload, payloadBytes, newBestHeader.id) ProgressInfo(none, Seq.empty, newBestChain, none) } private def processBetterChain(fullBlock: Block, + payloadBytes: Array[Byte], newBestHeader: Header, newBestChain: Seq[Block], blocksToKeep: Int): ProgressInfo = getHeaderOfBestBlock.map { header => @@ -75,7 +77,7 @@ trait HistoryModifiersProcessors extends HistoryApi { .headers .flatMap(h => if (h == fullBlock.header) fullBlock.some else getBlockByHeader(h)) toApply.foreach(addBlockToCacheIfNecessary) - if (toApply.lengthCompare(newChain.length - 1) != 0) nonBestBlock(fullBlock) + if (toApply.lengthCompare(newChain.length - 1) != 0) nonBestBlock(fullBlock, payloadBytes) else { //application of this block leads to full chain with higher score logger.info(s"Appending ${fullBlock.encodedId}|${fullBlock.header.height} as a better chain") @@ -90,7 +92,7 @@ trait HistoryModifiersProcessors extends HistoryApi { ) val updatedHeadersAtHeightIds = newChain.headers.map(header => updatedBestHeaderAtHeightRaw(header.id, Height @@ header.height)).toList - updateStorage(fullBlock.payload, newBestHeader.id, updateBestHeader, updatedHeadersAtHeightIds) + updateStorage(fullBlock.payload, payloadBytes, newBestHeader.id, updateBestHeader, updatedHeadersAtHeightIds) if (blocksToKeep >= 0) { val lastKept: Int = blockDownloadProcessor.updateBestBlock(fullBlock.header) val bestHeight: Int = toApply.lastOption.map(_.header.height).getOrElse(0) @@ -101,10 +103,10 @@ trait HistoryModifiersProcessors extends HistoryApi { } }.getOrElse(ProgressInfo(none, Seq.empty, Seq.empty, none)) - private def nonBestBlock(fullBlock: Block): ProgressInfo = { + private def nonBestBlock(fullBlock: Block, payloadBytes: Array[Byte]): ProgressInfo = { //Orphaned block or full chain is not initialized yet logger.info(s"Process block to history ${fullBlock.encodedId}||${fullBlock.header.height}||") - historyStorage.bulkInsert(fullBlock.payload.id, Seq.empty, Seq(fullBlock.payload)) + historyStorage.bulkInsert(fullBlock.payload.id, Seq.empty, Seq(fullBlock.payload -> payloadBytes)) ProgressInfo(none, Seq.empty, Seq.empty, none) } @@ -192,8 +194,8 @@ trait HistoryModifiersProcessors extends HistoryApi { Seq(heightIdsKey(h.height) -> StorageValue @@ (headerIdsAtHeight(h.height) :+ h.id).flatten.toArray) } - private def putToHistory(payload: Payload): ProgressInfo = { - historyStorage.insertObjects(Seq(payload)) + private def putToHistory(payload: Payload, payloadBytes: Array[Byte]): ProgressInfo = { + historyStorage.insertObjects(Seq(payload -> payloadBytes)) ProgressInfo(none, Seq.empty, Seq.empty, none) } @@ -223,13 +225,14 @@ trait HistoryModifiersProcessors extends HistoryApi { } private def updateStorage(newModRow: PersistentModifier, + newModRowBytes: Array[Byte], bestFullHeaderId: ModifierId, updateHeaderInfo: Boolean = false, additionalIndexes: List[(Array[Byte], Array[Byte])] = List.empty): Unit = { val indicesToInsert: Seq[(Array[Byte], Array[Byte])] = if (updateHeaderInfo) Seq(BestBlockKey -> bestFullHeaderId, BestHeaderKey -> bestFullHeaderId) else Seq(BestBlockKey -> bestFullHeaderId) - historyStorage.bulkInsert(newModRow.id, indicesToInsert ++ additionalIndexes, Seq(newModRow)) + historyStorage.bulkInsert(newModRow.id, indicesToInsert ++ additionalIndexes, Seq(newModRow -> newModRowBytes)) } private def isValidFirstBlock(header: Header): Boolean = diff --git a/src/main/scala/encry/view/history/storage/HistoryStorage.scala b/src/main/scala/encry/view/history/storage/HistoryStorage.scala index 89811d80c8..be1f36a893 100644 --- a/src/main/scala/encry/view/history/storage/HistoryStorage.scala +++ b/src/main/scala/encry/view/history/storage/HistoryStorage.scala @@ -39,26 +39,26 @@ case class HistoryStorage(override val store: VersionalStorage) extends EncrySto case _: VLDBWrapper => store.get(StorageKey @@ id.untag(ModifierId)).map(_.tail) } - def insertObjects(objectsToInsert: Seq[PersistentModifier]): Unit = store match { + def insertObjects(objectsToInsert: Seq[(PersistentModifier, Array[Byte])]): Unit = store match { case iodb: IODBHistoryWrapper => iodb.objectStore.update( Random.nextLong(), Seq.empty, - objectsToInsert.map(obj => ByteArrayWrapper(obj.id) -> - ByteArrayWrapper(HistoryModifiersProtoSerializer.toProto(obj))) + objectsToInsert.map(obj => ByteArrayWrapper(obj._1.id) -> + ByteArrayWrapper(obj._2)) ) case _: VLDBWrapper => insert( - StorageVersion @@ objectsToInsert.head.id.untag(ModifierId), + StorageVersion @@ objectsToInsert.head._1.id, objectsToInsert.map(obj => - StorageKey @@ obj.id.untag(ModifierId) -> StorageValue @@ HistoryModifiersProtoSerializer.toProto(obj) + StorageKey @@ obj._1.id.untag(ModifierId) -> StorageValue @@ obj._2 ).toList, ) } def bulkInsert(version: Array[Byte], indexesToInsert: Seq[(Array[Byte], Array[Byte])], - objectsToInsert: Seq[PersistentModifier]): Unit = store match { + objectsToInsert: Seq[(PersistentModifier, Array[Byte])]): Unit = store match { case _: IODBHistoryWrapper => insertObjects(objectsToInsert) insert( @@ -66,13 +66,12 @@ case class HistoryStorage(override val store: VersionalStorage) extends EncrySto indexesToInsert.map { case (key, value) => StorageKey @@ key -> StorageValue @@ value }.toList ) case _: VLDBWrapper => - logger.info(s"Inserting2: $objectsToInsert") insert( StorageVersion @@ version, (indexesToInsert.map { case (key, value) => StorageKey @@ key -> StorageValue @@ value } ++ objectsToInsert.map { obj => - StorageKey @@ obj.id.untag(ModifierId) -> StorageValue @@ HistoryModifiersProtoSerializer.toProto(obj) + StorageKey @@ obj._1.id.untag(ModifierId) -> StorageValue @@ obj._2 }).toList ) } diff --git a/src/test/scala/encry/modifiers/InstanceFactory.scala b/src/test/scala/encry/modifiers/InstanceFactory.scala index cf88e5b04f..b501672f55 100755 --- a/src/test/scala/encry/modifiers/InstanceFactory.scala +++ b/src/test/scala/encry/modifiers/InstanceFactory.scala @@ -3,13 +3,14 @@ package encry.modifiers import encry.consensus.EncrySupplyController import encry.modifiers.mempool._ import encry.modifiers.state.Keys +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.settings.{EncryAppSettings, NodeSettings} import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} import encry.utils.{EncryGenerator, FileHelper, NetworkTimeProvider, TestHelper} import encry.view.history.History import encry.view.history.storage.HistoryStorage import io.iohk.iodb.LSMStore -import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} +import org.encryfoundation.common.modifiers.history.{Block, Header, HistoryModifiersProtoSerializer, Payload} import org.encryfoundation.common.modifiers.mempool.transaction.{Input, Transaction} import org.encryfoundation.common.modifiers.state.box.{AssetBox, EncryProposition} import org.encryfoundation.common.modifiers.state.box.Box.Amount @@ -184,35 +185,35 @@ trait InstanceFactory extends Keys with EncryGenerator { val secondHistory = generateDummyHistory(settings) blocks._1.foldLeft(secondHistory) { case (prevHistory, blockToApply) => - prevHistory.append(blockToApply.header) - prevHistory.append(blockToApply.payload) + prevHistory.append(ModifierWithBytes(blockToApply.header)) + prevHistory.append(ModifierWithBytes(blockToApply.payload)) prevHistory.reportModifierIsValid(blockToApply) prevHistory } val nextBlockInFirstChain = generateNextBlock(histories.head) val nextBlockInSecondChain = generateNextBlock(secondHistory, additionalDifficulty = addDifficulty) - histories.head.append(nextBlockInFirstChain.header) - histories.head.append(nextBlockInFirstChain.payload) + histories.head.append(ModifierWithBytes(nextBlockInFirstChain.header)) + histories.head.append(ModifierWithBytes(nextBlockInFirstChain.payload)) val a = histories.head.reportModifierIsValid(nextBlockInFirstChain) - secondHistory.append(nextBlockInSecondChain.header) - secondHistory.append(nextBlockInSecondChain.payload) + secondHistory.append(ModifierWithBytes(nextBlockInSecondChain.header)) + secondHistory.append(ModifierWithBytes(nextBlockInSecondChain.payload)) val b = secondHistory.reportModifierIsValid(nextBlockInSecondChain) (List(a, b), (blocks._1 :+ nextBlockInFirstChain) -> List(nextBlockInSecondChain)) } else { val nextBlockInFirstChain = generateNextBlock(histories.head) val nextBlockInSecondChain = generateNextBlock(histories.last, additionalDifficulty = addDifficulty) - histories.head.append(nextBlockInFirstChain.header) - histories.head.append(nextBlockInFirstChain.payload) + histories.head.append(ModifierWithBytes(nextBlockInFirstChain.header)) + histories.head.append(ModifierWithBytes(nextBlockInFirstChain.payload)) val a = histories.head.reportModifierIsValid(nextBlockInFirstChain) - histories.last.append(nextBlockInSecondChain.header) - histories.last.append(nextBlockInSecondChain.payload) + histories.last.append(ModifierWithBytes(nextBlockInSecondChain.header)) + histories.last.append(ModifierWithBytes(nextBlockInSecondChain.payload)) val b = histories.last.reportModifierIsValid(nextBlockInSecondChain) (List(a, b), (blocks._1 :+ nextBlockInFirstChain) -> (blocks._2 :+ nextBlockInSecondChain)) } } else { val block: Block = generateNextBlock(histories.head) - histories.head.append(block.header) - histories.head.append(block.payload) + histories.head.append(ModifierWithBytes(block.header)) + histories.head.append(ModifierWithBytes(block.payload)) val a = histories.head.reportModifierIsValid(block) (List(a), (blocks._1 :+ block) -> blocks._2) } diff --git a/src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala b/src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala index 0bb95839cc..eadedfc9c5 100644 --- a/src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala +++ b/src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala @@ -1,12 +1,14 @@ package encry.network.DeliveryManagerTests import java.net.InetSocketAddress + import akka.actor.ActorSystem import akka.testkit.{TestActorRef, TestProbe} import encry.local.miner.Miner.{DisableMining, StartMining} import encry.modifiers.InstanceFactory import encry.network.DeliveryManager import encry.network.DeliveryManager.FullBlockChainIsSynced +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.network.NodeViewSynchronizer.ReceivableMessages.UpdatedHistory import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming} import encry.settings.EncryAppSettings @@ -14,6 +16,7 @@ import encry.view.history.History import org.encryfoundation.common.modifiers.history.Block import org.encryfoundation.common.network.BasicMessagesRepo.Handshake import org.encryfoundation.common.utils.TaggedTypes.ModifierId + import scala.collection.mutable import scala.collection.mutable.WrappedArray @@ -38,8 +41,8 @@ object DMUtils extends InstanceFactory { (0 until qty).foldLeft(history, List.empty[Block]) { case ((prevHistory, blocks), _) => val block: Block = generateNextBlock(prevHistory) - prevHistory.append(block.header) - prevHistory.append(block.payload) + prevHistory.append(ModifierWithBytes(block.header)) + prevHistory.append(ModifierWithBytes(block.payload)) val a = prevHistory.reportModifierIsValid(block) (a, blocks :+ block) } diff --git a/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerReRequestModifiesSpec.scala b/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerReRequestModifiesSpec.scala index 97acbe4976..a8c7f9f0a9 100644 --- a/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerReRequestModifiesSpec.scala +++ b/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerReRequestModifiesSpec.scala @@ -1,6 +1,7 @@ package encry.network.DeliveryManagerTests import java.net.InetSocketAddress + import akka.actor.ActorSystem import akka.testkit.{TestActorRef, TestProbe} import encry.consensus.HistoryConsensus @@ -8,6 +9,7 @@ import encry.consensus.HistoryConsensus.Older import encry.modifiers.InstanceFactory import encry.network.DeliveryManager import encry.network.DeliveryManagerTests.DMUtils._ +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.network.NetworkController.ReceivableMessages.DataFromPeer import encry.network.NodeViewSynchronizer.ReceivableMessages._ import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming} @@ -21,6 +23,7 @@ import org.encryfoundation.common.modifiers.mempool.transaction.Transaction import org.encryfoundation.common.network.BasicMessagesRepo.{Handshake, ModifiersNetworkMessage, RequestModifiersNetworkMessage} import org.encryfoundation.common.utils.TaggedTypes.ModifierId import org.scalatest.{BeforeAndAfterAll, Matchers, OneInstancePerTest, WordSpecLike} + import scala.concurrent.duration._ import scala.collection.mutable.WrappedArray @@ -133,7 +136,7 @@ class DeliveryManagerReRequestModifiesSpec extends WordSpecLike deliveryManager ! DataFromPeer(ModifiersNetworkMessage(Header.modifierTypeId, Map(headerIds.head -> headerBytes)), cp1) - history.append(blocks.head.header) + history.append(ModifierWithBytes(blocks.head.header)) val uHistory: History = history.reportModifierIsValid(blocks.head.header) deliveryManager ! UpdatedHistory(uHistory) diff --git a/src/test/scala/encry/network/DownloadedModifiersValidatorTests.scala b/src/test/scala/encry/network/DownloadedModifiersValidatorTests.scala index fdb907511f..5ac2cbfc83 100644 --- a/src/test/scala/encry/network/DownloadedModifiersValidatorTests.scala +++ b/src/test/scala/encry/network/DownloadedModifiersValidatorTests.scala @@ -3,27 +3,21 @@ package encry.network import java.net.InetSocketAddress import akka.actor.ActorSystem -import akka.testkit.{ TestActorRef, TestProbe } +import akka.testkit.{TestActorRef, TestProbe} import encry.modifiers.InstanceFactory import encry.network.BlackList.BanReason._ -import encry.network.DownloadedModifiersValidator.{ InvalidModifier, ModifiersForValidating } -import encry.network.NodeViewSynchronizer.ReceivableMessages.{ ChangedHistory, UpdatedHistory } -import encry.network.PeerConnectionHandler.{ ConnectedPeer, Outgoing } +import encry.network.DownloadedModifiersValidator.{InvalidModifier, ModifierWithBytes, ModifiersForValidating} +import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, UpdatedHistory} +import encry.network.PeerConnectionHandler.{ConnectedPeer, Outgoing} import encry.network.PeersKeeper.BanPeer import encry.settings.TestNetSettings import encry.view.NodeViewHolder.ReceivableMessages.ModifierFromRemote import encry.view.history.History import org.encryfoundation.common.crypto.equihash.EquihashSolution -import org.encryfoundation.common.modifiers.history.{ - Block, - Header, - HeaderProtoSerializer, - Payload, - PayloadProtoSerializer -} +import org.encryfoundation.common.modifiers.history.{Block, Header, HeaderProtoSerializer, Payload, PayloadProtoSerializer} import org.encryfoundation.common.network.BasicMessagesRepo.Handshake -import org.encryfoundation.common.utils.TaggedTypes.{ Height, ModifierId } -import org.scalatest.{ BeforeAndAfterAll, Matchers, OneInstancePerTest, WordSpecLike } +import org.encryfoundation.common.utils.TaggedTypes.{Height, ModifierId} +import org.scalatest.{BeforeAndAfterAll, Matchers, OneInstancePerTest, WordSpecLike} import scorex.crypto.hash.Digest32 import scorex.utils.Random @@ -150,7 +144,7 @@ class DownloadedModifiersValidatorTests Random.randomBytes() ) - history.append(header_first) + history.append(ModifierWithBytes(header_first)) nodeViewSync.send(downloadedModifiersValidator, UpdatedHistory(history)) @@ -196,8 +190,8 @@ class DownloadedModifiersValidatorTests val historyWith10Blocks = (0 until 10).foldLeft(history, Seq.empty[Block]) { case ((prevHistory, blocks), _) => val block: Block = generateNextBlock(prevHistory) - prevHistory.append(block.header) - prevHistory.append(block.payload) + prevHistory.append(ModifierWithBytes(block.header)) + prevHistory.append(ModifierWithBytes(block.payload)) (prevHistory.reportModifierIsValid(block), blocks :+ block) } @@ -213,7 +207,7 @@ class DownloadedModifiersValidatorTests .send(downloadedModifiersValidator, ModifiersForValidating(connectedPeer, Payload.modifierTypeId, mods)) peersKeeper.expectMsg(BanPeer(connectedPeer, CorruptedSerializedBytes)) - nodeViewHolder.expectMsg(ModifierFromRemote(payload)) + nodeViewHolder.expectMsg(ModifierFromRemote(ModifierWithBytes(payload))) } } @@ -221,8 +215,8 @@ class DownloadedModifiersValidatorTests (0 until qty).foldLeft(history, List.empty[Block]) { case ((prevHistory, blocks), _) => val block: Block = generateNextBlock(prevHistory) - prevHistory.append(block.header) - prevHistory.append(block.payload) + prevHistory.append(ModifierWithBytes(block.header)) + prevHistory.append(ModifierWithBytes(block.payload)) val a = prevHistory.reportModifierIsValid(block) (a, blocks :+ block) } diff --git a/src/test/scala/encry/network/NetworkMessagesProtoTest/BasicNetworkMessagesProtoTest.scala b/src/test/scala/encry/network/NetworkMessagesProtoTest/BasicNetworkMessagesProtoTest.scala index 7746922350..9ccd4caa7a 100644 --- a/src/test/scala/encry/network/NetworkMessagesProtoTest/BasicNetworkMessagesProtoTest.scala +++ b/src/test/scala/encry/network/NetworkMessagesProtoTest/BasicNetworkMessagesProtoTest.scala @@ -6,6 +6,7 @@ import NetworkMessagesProto.GeneralizedNetworkProtoMessage import NetworkMessagesProto.GeneralizedNetworkProtoMessage.InnerMessage import encry.EncryApp import encry.modifiers.InstanceFactory +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.settings.{EncryAppSettings, Settings} import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} import org.encryfoundation.common.modifiers.mempool.transaction.Transaction @@ -21,8 +22,8 @@ class BasicNetworkMessagesProtoTest extends PropSpec with Matchers with Instance val testedBlocks: Vector[Block] = (0 until 10).foldLeft(generateDummyHistory(settings), Vector.empty[Block]) { case ((prevHistory, blocks), _) => val block: Block = generateNextBlock(prevHistory) - prevHistory.append(block.header) - prevHistory.append(block.payload) + prevHistory.append(ModifierWithBytes(block.header)) + prevHistory.append(ModifierWithBytes(block.payload)) (prevHistory.reportModifierIsValid(block), blocks :+ block) }._2 val testedTransaction: Seq[Transaction] = genValidPaymentTxs(10) diff --git a/src/test/scala/encry/view/history/HistoryComparisionResultTest.scala b/src/test/scala/encry/view/history/HistoryComparisionResultTest.scala index 9c448ee320..5b027dff15 100644 --- a/src/test/scala/encry/view/history/HistoryComparisionResultTest.scala +++ b/src/test/scala/encry/view/history/HistoryComparisionResultTest.scala @@ -3,6 +3,7 @@ package encry.view.history import encry.consensus.HistoryConsensus._ import encry.modifiers.InstanceFactory import encry.network.DeliveryManagerTests.DMUtils.generateBlocks +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.settings.{EncryAppSettings, TestNetSettings} import org.encryfoundation.common.modifiers.history.Block import org.encryfoundation.common.network.SyncInfo @@ -21,8 +22,8 @@ class HistoryComparisionResultTest extends WordSpecLike val syncInfo: SyncInfo = SyncInfo(blocks.map(_.header.id)) val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) => - hst.append(block.header) - hst.append(block.payload) + hst.append(ModifierWithBytes(block.header)) + hst.append(ModifierWithBytes(block.payload)) hst.reportModifierIsValid(block) } @@ -36,8 +37,8 @@ class HistoryComparisionResultTest extends WordSpecLike val syncInfo: SyncInfo = SyncInfo(blocks.map(_.header.id)) val updatedHistory: History = blocks.take(50).foldLeft(history) { case (hst, block) => - hst.append(block.header) - hst.append(block.payload) + hst.append(ModifierWithBytes(block.header)) + hst.append(ModifierWithBytes(block.payload)) hst.reportModifierIsValid(block) } @@ -51,8 +52,8 @@ class HistoryComparisionResultTest extends WordSpecLike val syncInfo: SyncInfo = SyncInfo(Seq.empty) val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) => - hst.append(block.header) - hst.append(block.payload) + hst.append(ModifierWithBytes(block.header)) + hst.append(ModifierWithBytes(block.payload)) hst.reportModifierIsValid(block) } @@ -67,8 +68,8 @@ class HistoryComparisionResultTest extends WordSpecLike val syncInfo: SyncInfo = SyncInfo(blocks.take(30).map(_.header.id)) val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) => - hst.append(block.header) - hst.append(block.payload) + hst.append(ModifierWithBytes(block.header)) + hst.append(ModifierWithBytes(block.payload)) hst.reportModifierIsValid(block) } @@ -86,8 +87,8 @@ class HistoryComparisionResultTest extends WordSpecLike ) val updatedHistory: History = fork._1.take(30).foldLeft(history) { case (hst, block) => - hst.append(block.header) - hst.append(block.payload) + hst.append(ModifierWithBytes(block.header)) + hst.append(ModifierWithBytes(block.payload)) hst.reportModifierIsValid(block) } diff --git a/src/test/scala/encry/view/history/ModifiersValidationTest.scala b/src/test/scala/encry/view/history/ModifiersValidationTest.scala index e879864ea8..5a4f510b98 100644 --- a/src/test/scala/encry/view/history/ModifiersValidationTest.scala +++ b/src/test/scala/encry/view/history/ModifiersValidationTest.scala @@ -2,6 +2,7 @@ package encry.view.history import encry.modifiers.InstanceFactory import encry.network.DeliveryManagerTests.DMUtils.generateBlocks +import encry.network.DownloadedModifiersValidator.ModifierWithBytes import encry.settings.{EncryAppSettings, TestNetSettings} import org.encryfoundation.common.modifiers.history.Block import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike} @@ -17,7 +18,7 @@ class ModifiersValidationTest extends WordSpecLike val newHistory: History = generateDummyHistory(testNetSettings) val genesisBlock: Block = generateGenesisBlock(testNetSettings.constants.GenesisHeight) newHistory.testApplicable(genesisBlock.header).isRight shouldBe true - newHistory.append(genesisBlock.header) + newHistory.append(ModifierWithBytes(genesisBlock.header)) val updatedHistory: History = newHistory.reportModifierIsValid(genesisBlock.header) updatedHistory.testApplicable(genesisBlock.payload).isRight shouldBe true } @@ -26,18 +27,18 @@ class ModifiersValidationTest extends WordSpecLike val newHistory: History = generateDummyHistory(testNetSettings) blocks.take(1).foldLeft(newHistory) { case (history, block) => history.testApplicable(block.header).isRight shouldBe true - history.append(block.header) + history.append(ModifierWithBytes(block.header)) history.reportModifierIsValid(block.header) history.testApplicable(block.payload).isRight shouldBe true - history.append(block.payload) + history.append(ModifierWithBytes(block.payload)) history.reportModifierIsValid(block) } blocks.takeRight(1).foldLeft(newHistory) { case (history, block) => history.testApplicable(block.header).isRight shouldBe false - history.append(block.header) + history.append(ModifierWithBytes(block.header)) history.reportModifierIsValid(block.header) history.testApplicable(block.payload).isRight shouldBe true - history.append(block.payload) + history.append(ModifierWithBytes(block.payload)) history.reportModifierIsValid(block) } } From c1eb685cb8833713d68c93377f097036bce4ceae Mon Sep 17 00:00:00 2001 From: aleksandr Date: Fri, 29 Nov 2019 14:16:00 +0300 Subject: [PATCH 4/6] fix --- .../scala/encry/view/history/storage/HistoryStorage.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/encry/view/history/storage/HistoryStorage.scala b/src/main/scala/encry/view/history/storage/HistoryStorage.scala index be1f36a893..c805e41974 100644 --- a/src/main/scala/encry/view/history/storage/HistoryStorage.scala +++ b/src/main/scala/encry/view/history/storage/HistoryStorage.scala @@ -45,13 +45,13 @@ case class HistoryStorage(override val store: VersionalStorage) extends EncrySto Random.nextLong(), Seq.empty, objectsToInsert.map(obj => ByteArrayWrapper(obj._1.id) -> - ByteArrayWrapper(obj._2)) + ByteArrayWrapper(obj._1.modifierTypeId +: obj._2)) ) case _: VLDBWrapper => insert( StorageVersion @@ objectsToInsert.head._1.id, objectsToInsert.map(obj => - StorageKey @@ obj._1.id.untag(ModifierId) -> StorageValue @@ obj._2 + StorageKey @@ obj._1.id.untag(ModifierId) -> StorageValue @@ (obj._1.modifierTypeId +: obj._2) ).toList, ) } @@ -71,7 +71,7 @@ case class HistoryStorage(override val store: VersionalStorage) extends EncrySto (indexesToInsert.map { case (key, value) => StorageKey @@ key -> StorageValue @@ value } ++ objectsToInsert.map { obj => - StorageKey @@ obj._1.id.untag(ModifierId) -> StorageValue @@ obj._2 + StorageKey @@ obj._1.id.untag(ModifierId) -> StorageValue @@ (obj._1.modifierTypeId +: obj._2) }).toList ) } From f9bb44c65465a427572fab9dc43b8e4a542fbb27 Mon Sep 17 00:00:00 2001 From: aleksandr Date: Fri, 29 Nov 2019 15:01:58 +0300 Subject: [PATCH 5/6] fix test --- .../encry/network/DownloadedModifiersValidatorTests.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/scala/encry/network/DownloadedModifiersValidatorTests.scala b/src/test/scala/encry/network/DownloadedModifiersValidatorTests.scala index 5ac2cbfc83..184b80dc30 100644 --- a/src/test/scala/encry/network/DownloadedModifiersValidatorTests.scala +++ b/src/test/scala/encry/network/DownloadedModifiersValidatorTests.scala @@ -199,15 +199,17 @@ class DownloadedModifiersValidatorTests nodeViewSync.send(downloadedModifiersValidator, UpdatedHistory(historyWith10Blocks._1)) + val payloadBytes = PayloadProtoSerializer.toProto(payload).toByteArray + val mods: Map[ModifierId, Array[Byte]] = (historyWith10Blocks._2.map( b => b.payload.id -> PayloadProtoSerializer.toProto(b.payload).toByteArray.reverse - ) :+ (payload.id -> PayloadProtoSerializer.toProto(payload).toByteArray)).toMap + ) :+ (payload.id -> payloadBytes)).toMap deliveryManager .send(downloadedModifiersValidator, ModifiersForValidating(connectedPeer, Payload.modifierTypeId, mods)) peersKeeper.expectMsg(BanPeer(connectedPeer, CorruptedSerializedBytes)) - nodeViewHolder.expectMsg(ModifierFromRemote(ModifierWithBytes(payload))) + nodeViewHolder.expectMsg(ModifierFromRemote(ModifierWithBytes(payload, payloadBytes))) } } From b2a18aeb1058b956901f567218e5fec4e02c0995 Mon Sep 17 00:00:00 2001 From: aleksandr Date: Fri, 29 Nov 2019 15:13:44 +0300 Subject: [PATCH 6/6] fix mod serialization in ModifierWithBytes.apply() --- .../scala/encry/network/DownloadedModifiersValidator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/encry/network/DownloadedModifiersValidator.scala b/src/main/scala/encry/network/DownloadedModifiersValidator.scala index 72a59fe1d9..e5f84019c2 100644 --- a/src/main/scala/encry/network/DownloadedModifiersValidator.scala +++ b/src/main/scala/encry/network/DownloadedModifiersValidator.scala @@ -103,7 +103,8 @@ object DownloadedModifiersValidator { final case class ModifierWithBytes(modifier: PersistentModifier, bytes: Array[Byte]) object ModifierWithBytes { - def apply(modifier: PersistentModifier): ModifierWithBytes = new ModifierWithBytes(modifier, HistoryModifiersProtoSerializer.toProto(modifier)) + def apply(modifier: PersistentModifier): ModifierWithBytes = + new ModifierWithBytes(modifier, HistoryModifiersProtoSerializer.toProto(modifier).tail) } final case class ModifiersForValidating(remote: ConnectedPeer,