Decentralized Model Serving
EdgeServe is a distributed streaming system that can serve predictions from machine learning models in real time.
Machine learning deployments are getting more complex where models may source feature data from a variety of disparate services hosted on different devices. Examples include:
EdgeServe recognizes a need for a model serving system that can coordinate data from multiple data services and/or connect the results from multiple models. We have currently developed a research prototype with a Python programming interface and we encourage external usage and collaboration for feedback.
git clone https://github.com/swjz/EdgeServe.git
cd EdgeServe
pip3 -r requirements.txt
pip3 install -e .
EdgeServe depends on Apache Pulsar as its default message broker service. Here is an easy way to set up a standalone Apache Pulsar server within a Docker container:
docker run -it --name pulsar -p 6650:6650 -p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:3.1.0 bin/pulsar standalone
For more details, please refer to Apache Pulsar Doc.
EdgeServe has an iterator-based Python interface designed for streaming data. Each node can run multiple tasks at the same time, and each task can be replicated on multiple nodes as they consume the shared message queue in parallel.
from edgeserve.data_source import DataSource
node = 'pulsar://server-address:6650'
def stream():
while True:
yield sensor.read()
with DataSource(stream(), node, source_id='data1', topic='topic_in') as ds:
# every next() call sends an example to the message queue
next(ds)
from edgeserve.compute import Compute
node = 'pulsar://server-address:6650'
def task(data1, data2, data3, data4):
# actual prediction work here
return aggregate_and_predict(data1, data2, data3, data4)
with Compute(task, node, worker_id='worker1', topic_in='topic_in', topic_out='topic_out',\
min_interval_ms=30, drop_if_older_than_ms=500) as compute:
# every next() call consumes a message from the queue and runs task() on it
# the result is sent to another message queue
next(compute)
from edgeserve.materialize import Materialize
node = 'pulsar://server-address:6650'
def final_process(y_pred):
# reformat the prediction result, materialize and/or use it for decision making
make_decisions(y_pred)
with Materialize(final_process, node, topic='topic_out') as materialize:
# every next() call consumes a message from the prediction output queue and runs final_process() on it
next(materialize)
EdgeServe is a product of a multi-year research effort in edge computing at University of Chicago (ChiData)