Skip to main content

Testing Scalding type-safe API

I recently read Applying TDD to Scalding Development. The content of the article is interesting and still relevant but the example is implemented with the old Fields based API.

Following is an updated version of the code using Scalding's type-safe API:

class SessionsStatsJob(args: Args) extends Job(args) {
import ExternalOperations._
import SessionsStatsJob._
val maxIdleTimeInMillis = args.getOrElse("maxIdleTimeInMillis", "100").toInt
val input = args("input")
val output = args("output")
val events = TypedPipe.from(TypedCsv[(Int, Int, String, String)](input))
.map { case (timestamp, userId, _, _) => Event(timestamp, userId) }
events.extractSessions(maxIdleTimeInMillis)
.summarizeSessions
.write(TypedCsv[(Int, Int, Double, Double)](output))
}
object SessionsStatsJob {
case class Event(timestamp: Int, userId: Int)
case class Session(userId: Int, start: Int, end: Int, numberOfActions: Int) {
val duration: Int = end - start
def add(event: Event): Session =
copy(end = event.timestamp, numberOfActions = numberOfActions + 1)
}
case class SessionsSummary(userId: Int, numberOfSessions: Int, duration: Int, numberOfActions: Int) {
def add(session: Session): SessionsSummary =
copy(
duration = duration + session.duration,
numberOfActions = numberOfActions + session.numberOfActions,
numberOfSessions = numberOfSessions + 1
)
def stats(): (Int, Int, Double, Double) = {
val averageDuration = duration.toDouble / numberOfSessions
val averageNumberOfActions = numberOfActions.toDouble / numberOfSessions
(userId, numberOfSessions, averageDuration, averageNumberOfActions)
}
}
object SessionsSummary {
def fromSessions(sessions: Traversable[Session]): Traversable[SessionsSummary] =
sessions.groupBy { case Session(userId, _, _, _) => userId }
.map { case (userId, ss) =>
ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((summary, session) => summary.add(session))
}
}
}
object ExternalOperations {
import SessionsStatsJob._
trait ExtractSessions {
def events: TypedPipe[Event]
def extractSessions(maxIdleTimeInMillis: Int): TypedPipe[List[Session]] =
events.groupBy { case Event(_, userId) => userId }
.foldLeft(List.empty[Session]) {
case (head :: tail, event@Event(ts, _)) if (ts - head.end) < maxIdleTimeInMillis =>
head.add(event) :: tail
case (sessions, Event(timestamp, userId)) =>
Session(userId, timestamp, timestamp, 1) :: sessions
}.values
}
implicit class ExtractSessionsWrapper(val events: TypedPipe[Event]) extends ExtractSessions
trait SummarizeSessions {
def sessions: TypedPipe[List[Session]]
def summarizeSessions: TypedPipe[(Int, Int, Double, Double)] =
sessions.flatMap(SessionsSummary.fromSessions).map(_.stats())
}
implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]])
extends SummarizeSessions
}
class SessionsStatsJobSpec extends WordSpec with Matchers with TBddDsl {
import SessionsStatsJob._
"ExternalOperations" should {
import ExternalOperations._
"extract sessions" in {
Given {
List(
Event(1001, 1),
Event(1010, 1),
Event(1019, 1),
Event(1027, 1),
Event(1027, 2),
Event(1029, 2),
Event(1037, 1)
)
} When {
events: TypedPipe[Event] => events.extractSessions(10)
} Then {
_.toSet shouldBe Set(
List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)),
List(Session(2, 1027, 1029, 2))
)
}
}
"summarize sessions" in {
Given {
List(
List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)),
List(Session(2, 1027, 1029, 2))
)
} When {
sessions: TypedPipe[List[Session]] => sessions.summarizeSessions
} Then {
_.toSet shouldBe Set((1, 2, 13.0, 2.5), (2, 1, 2, 2))
}
}
}
"SessionsStatsJob" should {
"calculate average session duration and average number of actions per session" in {
val events = Seq(
(1000, 1, "get", "session1_event1"),
(1010, 1, "click", "session1_event2"),
(1020, 1, "click", "session1_event3"),
(1030, 1, "put", "session1_event4"),
(1100, 2, "get", "session1_event1"),
(1110, 2, "click", "session1_event2"),
(1160, 2, "put", "session1_event3"),
(1200, 1, "get", "session2_event1"),
(1210, 1, "click", "session2_event2"),
(1260, 1, "put", "session2_event3")
)
val expectedAverages = Set(
(1, 2, 45.0, 3.5),
(2, 1, 60.0, 3.0)
)
JobTest[SessionsStatsJob]
.arg("input", "events")
.arg("output", "averages")
.arg("maxIdleTimeInMillis", "100")
.source(TypedCsv[(Int, Int, String, String)]("events"), events)
.sink[(Int, Int, Double, Double)](TypedCsv[(Int, Int, Double, Double)]("averages")) {
_.toSet shouldBe expectedAverages
}
.run.finish
}
}
}

Comments

Popular posts from this blog

IntelliJ IDEA not starting: Initial heap size set to a larger value than the maximum heap size

IntelliJ IDEA didn't want to start this morning. Got the following error when trying to start it from a shell: Error occurred during initialization of VM Initial heap size set to a larger value than the maximum heap size What happened is that IntelliJ IDEA loaded the JVM Options from the new custom vmoptions file in the config directory. On Windows: %APPDATA%\Roaming\JetBrains\IntelliJIdea2020.1\idea64.exe.vmoptions On macOs: ~/Library/Application Support/JetBrains/IntelliJIdea2020.1/idea.vmoptions This file was not updated properly when I updated IntellIJ IDEA. It contained: -Xms4g -Xmx2048m Fixed the issue by editing this file: -Xms4g -Xmx4g Source: https://intellij-support.jetbrains.com/hc/en-us/community/posts/360004701620-idea-vmoptions-not-used-by-default  

How to mavenify a Play 2 application

This article is for developers who want to develop web-apps with Play 2 Framework but are required to integrate these new web-apps in an existing build process based on Maven . Lucky you, someone already started a project to do this: play2-maven-plugin . Unfortunately, this Maven plugin does not support hot reloading yet, which makes the development process painful. To make it short, you still need SBT to enjoy Play 2 hot reloading feature... but you do not want to have to maintain both Maven and SBT configurations. The trick is to configure SBT from Maven pom files with sbt-pom-reader . This is how you need to configure your play2-maven project: <my-maven-project>/ pom.xml <- Your maven build build.sbt <- the sbt Play 2 configuration project/ build.properties <- the sbt version specification build.scala <- the sbt build definition plugins.sbt <- the sbt plugin configuration ...

Accessing a Docker container running in a Docker-Machine on localhost

On Linux you can access your running Docker container on localhost or remotely by publishing the desired port. On macOS it will only give you access to the Docker container from the Docker-Machine it is running on, i.e. from docker-machine ip <machine_name> . To access it on localhost, you can use ssh port forwarding: docker-machine ssh <machine_name> -fNTL <local_port> :localhost:<machine_port> You can now access your Docker container on localhost:<local_port> . Bonus: Accessing your Docker container from a remote computer. By default, with ssh -L , the local port is bound for local use only. You can use the bind _address option to make your Docker container available publicly: docker-machine ssh <machine_name> -fNTL \*:<local_port>:<localhost>:<machine_port> You can now access your Docker container on <your_ip>:<local_port> .