Ingesting Objects from another Django Instance
Table of Contents
Django Object Sync - This article is part of a series.
We were working on an online-offline product that required changes done to a local server to be synced to a global central server. Changes could only be made on the local server and a complete view of the database must be available on the global central server. All of this in Django and python3.
This article, is the 2nd in a series of 3, describes how to ingest objects from operational log of the additions, changes and deletion to a database using Django. We begin by describing the Task.
TL;DR Summary #
We created a repo for this code on github. If you have no time then head over there and hack it.
Introduction #
Task #
Consider a stream of Django JSONified objects arriving in a file from another django instance. We need to ingest these objects in our django instance. These records might either be SAVE records or DELETE records. SAVE records can be for insertions or updates.
Plus we must make sure we do not miss any records in this stream. Reingesting records is not a problem as it will not lead to inconsistency.
Ideas #
- The code must be a part of the Django framework so as to know the models in the django project but not an API as the ingestion happens from a file.
- The program must continue from where it last stopped so that no records are missed. Even after program restarts. Must store the last ingestion point of the file.
- What if we use a Django-admin command to watch for changes in a file and ingest the changes?
Django-Admin Commands #
When this blog is being made the current django version in vogue is 4.2. This version has django-admin commands.
Applications can register their own actions with manage.py. For example, you might want to add a manage.py action for a Django app that you’re distributing. 1
Deep Dive #
Custom Django Admin Command #
We create a new Django Admin command by creating a file at the location
polls/management/commands/ingest_objects.py
.
In this file we create a new class as follows:
class Command(BaseCommand):
help = 'Read serialized objects from a file and save them, watching for changes in the file.'
def add_arguments(self, parser):
parser.add_argument('file_path', type=str, help='Path to the file to watch and ingest objects from.')
def handle(self, *args, **options):
pass
This is where we setup the command. We define that the command takes a file path as arguments, from where it must ingest the logs.
Watching for Changes #
Let’s talk about how this command must work?
We must observe for changes in the said file and start ingestion as soon as possible. Notice that we do not know whether a single log has appeared or many. So we must ingest all the logs which have arrived and then continue watching after that.
We will use watchdog python module to watch
for changes in a file. Let’s code this in the command’s handle
method.
1def handle(self, *args, **options):
2 file_path = options['file_path']
3 handler = ObjectIngestHandler(file_path)
4 handler.ingest_objects()
5 observer = Observer()
6 observer.schedule(handler, os.path.dirname(file_path), recursive=False)
7 observer.start()
8 try:
9 while True:
10 time.sleep(1)
11 except KeyboardInterrupt:
12 observer.stop()
13 observer.join()
So we find the file_path
required to be observed and pass this to the observer
which is an instance of the ObjectIngestHandler
class.
Let us define what this class does.
ObjectIngestHandler #
1class ObjectIngestHandler(FileSystemEventHandler):
2 def __init__(self, file_path):
3 self.file_path = file_path
4
5 def on_modified(self, event):
6 if event.src_path == self.file_path and event.event_type == 'modified':
7 // what happens when file is modified.
8 self.ingest_objects()
9
10 def on_created(self, event):
11 if event.src_path == self.file_path:
12 // what happens when file is created.
13 self.ingest_objects()
14
15 def ingest_objects(self):
16 try:
17 // this is where the file will be processed
18 except EOFError:
19 // The file has ended so nothing more to do
20 pass
21 except Exception as e:
22 logger.error(msg=f'ingest_objects {e}')
23 logger.error(msg=f'ingest_object ERROR {traceback.format_exc()}')
Processing the File #
We must ascertain whether the said file has been processed already and till what line number. If the file has been recreated, start from position 0 otherwise start from the last place where the processing was completed.
Let’s create a Django Model for storing this information. I know I know, this is a hack! But this is the simplest solution for now. We keep the other complex solutions as food for thought.
Following is a simple model to track the position of the file tracked till now.
class ProcessedFile(models.Model):
file_path = models.CharField(max_length=255, unique=True)
last_processed_position = models.PositiveIntegerField(default=0)
We will use this model for saving the progress over a file so that no records are missed. Follow along the code with the comments to understand what is happening.
1def ingest_objects(self):
2 try:
3 // 'fetch last position of this file if available.'
4 processed_file, created = ProcessedFile.objects.get_or_create(file_path=self.file_path)
5 // 'open the file'
6 with open(self.file_path, 'r') as file:
7 file_size = os.fstat(file.fileno()).st_size
8 // 'check whether the file has been truncated after the last run'
9 if processed_file.last_processed_position > file_size:
10 processed_file.last_processed_position = 0
11 // 'seek to the appropriate position'
12 file.seek(processed_file.last_processed_position)
13 // 'read line by line till the end of the file'
14 line = file.readline()
15 while line:
16 // 'we will create this next'
17 ingest_object(line)
18 line = file.readline()
19 // 'find the end of file location, and save it for next run'
20 processed_file.last_processed_position = file.tell()
21 processed_file.save()
22 except EOFError:
23 pass
24 except Exception as e:
25 logger.error(msg=f'ingest_objects {e}')
26 logger.error(msg=f'ingest_object ERROR {traceback.format_exc()}')
Ingesting the Object #
Let us develop the ingest_object
method that de-serializes the JSON objects to django ORM.
For logs like:
1698395500.971553 SAVE [{"model": "polls.question", "pk": 1, "fields": {"id": "a3f4729c-b3c2-4264-82a6-8ef7abeaec96", "question_text": "What's new?", "pub_date": "2023-10-27T08:31:40.182Z"}}]
we need to split the logs on spaces and separate the time, operation and the actual object.
Let us ignore the time field for now.
s = line[line.find(' ')+1:]
Now in the variable s
we have the remaining line, formatted as OPERATION JSON-OBJECT
.
Let us find the operation now.
op = s[:s.find(' ')]
The op
variable now contains the operation
Let us now find the object, and deserialize it.
data = serializers.deserialize('json', s[s.find(' ')+1:],)
The data
variable now contains the deserialized object in an python list-like object.
For the DELETE operation we must find the relevant object ID and have it deleted from the ORM. The code is highlighted in the next code block
1def ingest_object( line):
2 try:
3 s = line[line.find(' ')+1:]
4 op = s[:s.find(' ')]
5 data = serializers.deserialize('json', s[s.find(' ')+1:],)
6 if op == 'SAVE':
7 for obj in data:
8 obj.save()
9 logger.info(msg=f'ingest_object SAVED Object ID {obj.object.id}')
10 elif op == 'DELETE':
11 for obj in data:
12 o = obj.object.__class__.objects.get(id=obj.object.id)
13 o.delete()
14 logger.info(msg=f'ingest_object DELETE Object ID {obj.object.id}')
15 except Exception as e:
16 logger.error(msg=f'ingest_object {e} {line}')
17 logger.error(msg=f'ingest_object ERROR {traceback.format_exc()}')
Bringing it all together the code is available at Django Objects Sync.
Demo #
Download or clone the code at Django Objects Sync.
Switch to the
blog
branch if cloned.Install the
pip
requirements from theREQUIREMENTS.TXT
file.Run the migrations for django.
Run the Following Commands in django shell after all migrations have been made to verify functionality.
from polls.models import Choice, Question from django.utils import timezone q = Question(question_text="What's new?", pub_date=timezone.now()) q.save() q.choice_set.create(choice_text='Nothing',votes=0) c=q.choice_set.create(choice_text='sky',votes=0) c.delete() c.save()
The following output will be created and similar logs will be appended to the file at logs/oplog.log.
1698395500.971553 SAVE [{"model": "polls.question", "pk": 1, "fields": {"id": "a3f4729c-b3c2-4264-82a6-8ef7abeaec96", "question_text": "What's new?", "pub_date": "2023-10-27T08:31:40.182Z"}}] 1698395542.826431 SAVE [{"model": "polls.choice", "pk": 1, "fields": {"id": "d6d2c0a0-6dfa-4515-875e-f1edfd3dbf01", "question": "a3f4729c-b3c2-4264-82a6-8ef7abeaec96", "choice_text": "Nothing", "votes": 0}}] 1698395568.857164 SAVE [{"model": "polls.choice", "pk": 2, "fields": {"id": "d8e48b8d-9ccb-4728-b90c-2833592b6fd2", "question": "a3f4729c-b3c2-4264-82a6-8ef7abeaec96", "choice_text": "sky", "votes": 0}}] 1698395573.036682 DELETE [{"model": "polls.choice", "pk": 2, "fields": {"id": "d8e48b8d-9ccb-4728-b90c-2833592b6fd2", "question": "a3f4729c-b3c2-4264-82a6-8ef7abeaec96", "choice_text": "sky", "votes": 0}}] 1698395577.627465 SAVE [{"model": "polls.choice", "pk": 3, "fields": {"id": "d8e48b8d-9ccb-4728-b90c-2833592b6fd2", "question": "a3f4729c-b3c2-4264-82a6-8ef7abeaec96", "choice_text": "sky", "votes": 0}}]
Create another clone of the repository or download and unzip the code at another location.
Run the migrations at the new location.
Find the absolute path of the oplog from the last run of the previous clone.
Run the
ingest_objects
command usingmanage.py
with the absolute path of the oplog from the last run of the previous code.Inspect Django ORM of the new clone using Django shell of SQLite Viewer.
Conclusion #
So far so good. We now know how to create the oplog, and how to ingest the oplog. Let us find ways to stream these logs over to the new instance in the next blog in this series.