Skip to main content

An ETL Framework Powered by Apache Spark

November 09, 2015

Mike MacIntyre

When applying data science to cybersecurity, providing insightful and unbiased analytics on any data presents a variety of challenges. To name but a few, the supporting data platform must be ready to ingest data in virtually any format, deal with changing data rates and ultimately cater for a broad range of analytical use cases.

Apache Spark™ as a backbone of an ETL architecture is an obvious choice. Using Spark allows us to leverage in-house experience with the Hadoop ecosystem. While Apache Hadoop® is invaluable for data analysis and modelling, Spark enables near real-time processing pipeline via its low latency capabilities and streaming API.

Suppose a classic use case of threat detection by correlating technical Threat Intelligence, i.e. Indicators of Compromise (IOC’s) such as known bad IP addresses, with log data such as web proxy logs. Rather than discussing details around malicious content identification (missing FQDN, domain masquerade, typo squatting etc.), let’s focus on an actual end-to-end workflow built on Spark for Threat Intelligence sweeping.

Proxy logs are continuously intercepted by Apache Flume™ remote agent and streamed to a Kafka channel via a local cluster Flume agent.

agent.sources = avroSource
agent.channels = memoryChannel kafkaChannel
agent.sinks = hdfsSink
agent.sources.avroSource.type = avro
agent.sources.avroSource.channels = memoryChannel kafkaChannel
agent.sources.avroSource.bind = ec2-203-0-113-25.compute-1.amazonaws.com
agent.sources.avroSource.port = 4141
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path = /user/flume/proxy/%Y%m%d
agent.sinks.hdfsSink.hdfs.filePrefix = proxy-data
agent.sinks.hdfsSink.hdfs.fileSuffix = .txt.gz
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = CompressedStream
agent.sinks.hdfsSink.hdfs.codeC = gzip
agent.sinks.hdfsSink.hdfs.rollInterval = 0
agent.sinks.hdfsSink.hdfs.rollSize = 0
agent.sinks.hdfsSink.hdfs.rollCount = 100000
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100000
agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.kafkaChannel.brokerList = localhost:9092
agent.channels.kafkaChannel.topic = proxy
agent.channels.kafkaChannel.zookeeperConnect = localhost:2181,someotherhost:2181
view rawflume-conf.properties hosted with ❤ by GitHub

The ETL framework makes use of seamless Spark integration with Kafka to extract new log lines from the incoming messages. With the use of the streaming analysis, data can be processed as it becomes available, thus reducing the time to detection.

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
trait StreamExtractor[T] {
def extract: DStream[T]
}
class ProxyStreamExtractor(val ssc: StreamingContext) extends StreamExtractor[String] {
override def extract: DStream[String] = {
// Receiver-less approach: Kafka brokers instead of Zookeeper
val kafkaParams = Map(metadata.broker.list > somehost:9092,anotherhost:9092)
val topics = Set(web.proxy)
// Poll for new Kafka messagees
val kafkaStream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
// Return a new DStream comprising new lines from the proxy log
kafkaStream.map{ … }
}
}

As you can see the workflow revolves around DStreams, which is a convenient concept of micro batches of data represented as DataFrames.

The modified stream of textual data is ready to be passed down the pipeline. Log entries are interpreted and transformed into database records. Entries failing to meet expectations set by a schema are marked as invalid:

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.sql.Row
trait StreamTransformer[S, T] {
def parse(input: DStream[S]): DStream[T]
}
class ProxyRowTransformer extends StreamTransformer[String, Row] {
def parse(stream: DStream[String]): DStream[Row] = {
stream
.transform(rdd => {
rdd.map(line => {
// Transform lines of text into SQL records, include failures
})
})
}
}

Successfully parsed input is analyzed and scrutinized by a Threat Intelligence rules engine.

import org.apache.spark.sql._
import org.apache.spark.streaming.dstream.DStream
trait Analyser[T] {
def analyse(input: DStream[T])
}
class WindowAnalyzer(val ctx: HiveContext) extends Analyser[Row] {
override def analyse(input: DStream[T]) {
// Applies analysis on the sliding window, such as volume metrics
}
}
class ThreatIntelSweeper(val ctx: HiveContext) extends Analyser[Row] {
override def analyse(input: DStream[T]) {
// Malicious content identification
}
}
view rawProxyRowAnalyzer.scala hosted with ❤ by GitHub

Records once analyzed can be stored in any number of data stores, e.g. HDFS or HBase, for downstream analysis and presentation. Exceptions and lines which failed to be successfully parsed in general can also be passed directly into the persistence layer.

import org.apache.spark.sql._
import org.apache.spark.streaming.dstream.DStream
trait Loader[T] {
def load(input: T)
}
class SuccessLoader(ctx: HiveContext) extends Loader[DStream[Row]] {
override def load(stream: DStream[Row]) = {
// Data aggregation and persistence
}
}
class ErrorLoader(ctx: HiveContext) extends Loader[DStream[Row]] {
// Uploads failed log entries
}
view rawProxyRowLoader.scala hosted with ❤ by GitHub

In summary, Apache Spark has evolved into a full-fledged ETL engine with DStream and RDD as ubiquitous data formats suitable both for streaming and batch processing. Only a thin abstraction layer is needed to come up with a customizable framework. The example below depicts the idea of a fluent API backed by Apache Spark.

// App configuration, provides access to Spark context, Kafka brokers, topics etc.
val conf = new Map(…)
// ETLEngine represents a custom API layer
val etl = new ETLEngine(conf)
// A stream of new proxy logs
val proxyStream: DStream[Row] = etl.extract(new ProxyStreamExtractor
.transform(new ProxyRowTransformer)
// Split into two distinct sets
val successFilter = proxyStream.filter(rdd => …)
val failedFilter = proxyStream.filter(rdd => …)
// Trigger a new workflow
// Hive context needed for SQL operations
val hiveCtx: HiveContext =
// Start with successfully parsed lines
etl.process(successFilter)
.analyse(new WindowAnalyzer(hiveCtx))
.analyse(new ThreatIntelSweeper(hiveCtx))
.load(new SuccessLoader(hiveCtx))
// Now process the errors
.process(failedFilter)
.load(new ErrorLoader(hiveCtx))
view rawETLShowCase.scala hosted with ❤ by GitHub