I recently read Applying TDD to Scalding Development. The content of the article is interesting and still relevant but the example is implemented with the old Fields based API.
Following is an updated version of the code using Scalding's type-safe API:
Following is an updated version of the code using Scalding's type-safe API:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SessionsStatsJob(args: Args) extends Job(args) { | |
import ExternalOperations._ | |
import SessionsStatsJob._ | |
val maxIdleTimeInMillis = args.getOrElse("maxIdleTimeInMillis", "100").toInt | |
val input = args("input") | |
val output = args("output") | |
val events = TypedPipe.from(TypedCsv[(Int, Int, String, String)](input)) | |
.map { case (timestamp, userId, _, _) => Event(timestamp, userId) } | |
events.extractSessions(maxIdleTimeInMillis) | |
.summarizeSessions | |
.write(TypedCsv[(Int, Int, Double, Double)](output)) | |
} | |
object SessionsStatsJob { | |
case class Event(timestamp: Int, userId: Int) | |
case class Session(userId: Int, start: Int, end: Int, numberOfActions: Int) { | |
val duration: Int = end - start | |
def add(event: Event): Session = | |
copy(end = event.timestamp, numberOfActions = numberOfActions + 1) | |
} | |
case class SessionsSummary(userId: Int, numberOfSessions: Int, duration: Int, numberOfActions: Int) { | |
def add(session: Session): SessionsSummary = | |
copy( | |
duration = duration + session.duration, | |
numberOfActions = numberOfActions + session.numberOfActions, | |
numberOfSessions = numberOfSessions + 1 | |
) | |
def stats(): (Int, Int, Double, Double) = { | |
val averageDuration = duration.toDouble / numberOfSessions | |
val averageNumberOfActions = numberOfActions.toDouble / numberOfSessions | |
(userId, numberOfSessions, averageDuration, averageNumberOfActions) | |
} | |
} | |
object SessionsSummary { | |
def fromSessions(sessions: Traversable[Session]): Traversable[SessionsSummary] = | |
sessions.groupBy { case Session(userId, _, _, _) => userId } | |
.map { case (userId, ss) => | |
ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((summary, session) => summary.add(session)) | |
} | |
} | |
} | |
object ExternalOperations { | |
import SessionsStatsJob._ | |
trait ExtractSessions { | |
def events: TypedPipe[Event] | |
def extractSessions(maxIdleTimeInMillis: Int): TypedPipe[List[Session]] = | |
events.groupBy { case Event(_, userId) => userId } | |
.foldLeft(List.empty[Session]) { | |
case (head :: tail, event@Event(ts, _)) if (ts - head.end) < maxIdleTimeInMillis => | |
head.add(event) :: tail | |
case (sessions, Event(timestamp, userId)) => | |
Session(userId, timestamp, timestamp, 1) :: sessions | |
}.values | |
} | |
implicit class ExtractSessionsWrapper(val events: TypedPipe[Event]) extends ExtractSessions | |
trait SummarizeSessions { | |
def sessions: TypedPipe[List[Session]] | |
def summarizeSessions: TypedPipe[(Int, Int, Double, Double)] = | |
sessions.flatMap(SessionsSummary.fromSessions).map(_.stats()) | |
} | |
implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]]) | |
extends SummarizeSessions | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SessionsStatsJobSpec extends WordSpec with Matchers with TBddDsl { | |
import SessionsStatsJob._ | |
"ExternalOperations" should { | |
import ExternalOperations._ | |
"extract sessions" in { | |
Given { | |
List( | |
Event(1001, 1), | |
Event(1010, 1), | |
Event(1019, 1), | |
Event(1027, 1), | |
Event(1027, 2), | |
Event(1029, 2), | |
Event(1037, 1) | |
) | |
} When { | |
events: TypedPipe[Event] => events.extractSessions(10) | |
} Then { | |
_.toSet shouldBe Set( | |
List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), | |
List(Session(2, 1027, 1029, 2)) | |
) | |
} | |
} | |
"summarize sessions" in { | |
Given { | |
List( | |
List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), | |
List(Session(2, 1027, 1029, 2)) | |
) | |
} When { | |
sessions: TypedPipe[List[Session]] => sessions.summarizeSessions | |
} Then { | |
_.toSet shouldBe Set((1, 2, 13.0, 2.5), (2, 1, 2, 2)) | |
} | |
} | |
} | |
"SessionsStatsJob" should { | |
"calculate average session duration and average number of actions per session" in { | |
val events = Seq( | |
(1000, 1, "get", "session1_event1"), | |
(1010, 1, "click", "session1_event2"), | |
(1020, 1, "click", "session1_event3"), | |
(1030, 1, "put", "session1_event4"), | |
(1100, 2, "get", "session1_event1"), | |
(1110, 2, "click", "session1_event2"), | |
(1160, 2, "put", "session1_event3"), | |
(1200, 1, "get", "session2_event1"), | |
(1210, 1, "click", "session2_event2"), | |
(1260, 1, "put", "session2_event3") | |
) | |
val expectedAverages = Set( | |
(1, 2, 45.0, 3.5), | |
(2, 1, 60.0, 3.0) | |
) | |
JobTest[SessionsStatsJob] | |
.arg("input", "events") | |
.arg("output", "averages") | |
.arg("maxIdleTimeInMillis", "100") | |
.source(TypedCsv[(Int, Int, String, String)]("events"), events) | |
.sink[(Int, Int, Double, Double)](TypedCsv[(Int, Int, Double, Double)]("averages")) { | |
_.toSet shouldBe expectedAverages | |
} | |
.run.finish | |
} | |
} | |
} |
Comments
Post a Comment