An ETL Framework Powered by Apache Spark
November 09, 2015
When applying data science to cybersecurity, providing insightful and unbiased analytics on any data presents a variety of challenges. To name but a few, the supporting data platform must be ready to ingest data in virtually any format, deal with changing data rates and ultimately cater for a broad range of analytical use cases.
Apache Spark™ as a backbone of an ETL architecture is an obvious choice. Using Spark allows us to leverage in-house experience with the Hadoop ecosystem. While Apache Hadoop® is invaluable for data analysis and modelling, Spark enables near real-time processing pipeline via its low latency capabilities and streaming API.
Suppose a classic use case of threat detection by correlating technical Threat Intelligence, i.e. Indicators of Compromise (IOC’s) such as known bad IP addresses, with log data such as web proxy logs. Rather than discussing details around malicious content identification (missing FQDN, domain masquerade, typo squatting etc.), let’s focus on an actual end-to-end workflow built on Spark for Threat Intelligence sweeping.
Proxy logs are continuously intercepted by Apache Flume™ remote agent and streamed to a Kafka channel via a local cluster Flume agent.
agent.sources = avroSource | |
agent.channels = memoryChannel kafkaChannel | |
agent.sinks = hdfsSink | |
agent.sources.avroSource.type = avro | |
agent.sources.avroSource.channels = memoryChannel kafkaChannel | |
agent.sources.avroSource.bind = ec2-203-0-113-25.compute-1.amazonaws.com | |
agent.sources.avroSource.port = 4141 | |
agent.sinks.hdfsSink.type = hdfs | |
agent.sinks.hdfsSink.channel = memoryChannel | |
agent.sinks.hdfsSink.hdfs.path = /user/flume/proxy/%Y%m%d | |
agent.sinks.hdfsSink.hdfs.filePrefix = proxy-data | |
agent.sinks.hdfsSink.hdfs.fileSuffix = .txt.gz | |
agent.sinks.hdfsSink.hdfs.writeFormat = Text | |
agent.sinks.hdfsSink.hdfs.fileType = CompressedStream | |
agent.sinks.hdfsSink.hdfs.codeC = gzip | |
agent.sinks.hdfsSink.hdfs.rollInterval = 0 | |
agent.sinks.hdfsSink.hdfs.rollSize = 0 | |
agent.sinks.hdfsSink.hdfs.rollCount = 100000 | |
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true | |
agent.channels.memoryChannel.type = memory | |
agent.channels.memoryChannel.capacity = 100000 | |
agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel | |
agent.channels.kafkaChannel.brokerList = localhost:9092 | |
agent.channels.kafkaChannel.topic = proxy | |
agent.channels.kafkaChannel.zookeeperConnect = localhost:2181,someotherhost:2181 |
The ETL framework makes use of seamless Spark integration with Kafka to extract new log lines from the incoming messages. With the use of the streaming analysis, data can be processed as it becomes available, thus reducing the time to detection.
import kafka.serializer.StringDecoder | |
import org.apache.spark.streaming._ | |
trait StreamExtractor[T] { | |
def extract: DStream[T] | |
} | |
class ProxyStreamExtractor(val ssc: StreamingContext) extends StreamExtractor[String] { | |
override def extract: DStream[String] = { | |
// Receiver-less approach: Kafka brokers instead of Zookeeper | |
val kafkaParams = Map(“metadata.broker.list“ –> “somehost:9092,anotherhost:9092“) | |
val topics = Set(“web.proxy“) | |
// Poll for new Kafka messagees | |
val kafkaStream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics) | |
// Return a new DStream comprising new lines from the proxy log | |
kafkaStream.map{ … } | |
} | |
} |
As you can see the workflow revolves around DStreams, which is a convenient concept of micro batches of data represented as DataFrames.
The modified stream of textual data is ready to be passed down the pipeline. Log entries are interpreted and transformed into database records. Entries failing to meet expectations set by a schema are marked as invalid:
import org.apache.spark.streaming.dstream.DStream | |
import org.apache.spark.sql.Row | |
trait StreamTransformer[S, T] { | |
def parse(input: DStream[S]): DStream[T] | |
} | |
class ProxyRowTransformer extends StreamTransformer[String, Row] { | |
def parse(stream: DStream[String]): DStream[Row] = { | |
stream | |
.transform(rdd => { | |
rdd.map(line => { | |
// Transform lines of text into SQL records, include failures | |
}) | |
}) | |
} | |
} |
Successfully parsed input is analyzed and scrutinized by a Threat Intelligence rules engine.
import org.apache.spark.sql._ | |
import org.apache.spark.streaming.dstream.DStream | |
trait Analyser[T] { | |
def analyse(input: DStream[T]) | |
} | |
class WindowAnalyzer(val ctx: HiveContext) extends Analyser[Row] { | |
override def analyse(input: DStream[T]) { | |
// Applies analysis on the sliding window, such as volume metrics | |
} | |
} | |
class ThreatIntelSweeper(val ctx: HiveContext) extends Analyser[Row] { | |
override def analyse(input: DStream[T]) { | |
// Malicious content identification | |
} | |
} |
Records once analyzed can be stored in any number of data stores, e.g. HDFS or HBase, for downstream analysis and presentation. Exceptions and lines which failed to be successfully parsed in general can also be passed directly into the persistence layer.
import org.apache.spark.sql._ | |
import org.apache.spark.streaming.dstream.DStream | |
trait Loader[T] { | |
def load(input: T) | |
} | |
class SuccessLoader(ctx: HiveContext) extends Loader[DStream[Row]] { | |
override def load(stream: DStream[Row]) = { | |
// Data aggregation and persistence | |
} | |
} | |
class ErrorLoader(ctx: HiveContext) extends Loader[DStream[Row]] { | |
// Uploads failed log entries | |
} |
In summary, Apache Spark has evolved into a full-fledged ETL engine with DStream and RDD as ubiquitous data formats suitable both for streaming and batch processing. Only a thin abstraction layer is needed to come up with a customizable framework. The example below depicts the idea of a fluent API backed by Apache Spark.
// App configuration, provides access to Spark context, Kafka brokers, topics etc. | |
val conf = new Map(…) | |
// ETLEngine represents a custom API layer | |
val etl = new ETLEngine(conf) | |
// A stream of new proxy logs | |
val proxyStream: DStream[Row] = etl.extract(new ProxyStreamExtractor | |
.transform(new ProxyRowTransformer) | |
// Split into two distinct sets | |
val successFilter = proxyStream.filter(rdd => …) | |
val failedFilter = proxyStream.filter(rdd => …) | |
// Trigger a new workflow | |
// Hive context needed for SQL operations | |
val hiveCtx: HiveContext = … | |
// Start with successfully parsed lines | |
etl.process(successFilter) | |
.analyse(new WindowAnalyzer(hiveCtx)) | |
.analyse(new ThreatIntelSweeper(hiveCtx)) | |
.load(new SuccessLoader(hiveCtx)) | |
// Now process the errors | |
.process(failedFilter) | |
.load(new ErrorLoader(hiveCtx)) |