refactor
Diff
scrape_ecourtindia_v6/.gitignore | 1 +
scrape_ecourtindia_v6/main.py | 86 --------------------------------------------------------------------------------
scrape_ecourtindia_v6/scrape_case_status.py | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
scrape_ecourtindia_v6/scrape_orders.py | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
scrape_ecourtindia_v6/scraper.py | 221 --------------------------------------------------------------------------------
scrape_ecourtindia_v6/csv/.keep | 0
scrape_ecourtindia_v6/modules/scraper.py | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
scrape_ecourtindia_v6/modules/scraper_case_status.py | 156 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
scrape_ecourtindia_v6/modules/scraper_orders.py | 29 +++++++++++++++++++++++++++++
9 files changed, 436 insertions(+), 307 deletions(-)
@@ -1,1 +1,2 @@
courts.csv
csv/*
@@ -1,86 +1,0 @@
import csv
from scraper import Scraper
from tinydb import TinyDB
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
db = TinyDB('db.json')
SCRAPE_ESTABLISHMENTS = True
class ThreadSafeCSVWriter:
def __init__(self, filename):
self.file = open(filename, 'w', newline='')
self.writer = csv.writer(self.file)
self.lock = threading.Lock()
def writerow(self, row):
with self.lock:
self.writer.writerow(row)
def close(self):
self.file.close()
def scrape_state_thread(state, config, csv_writer):
scraper = Scraper(db, config)
scraper.close_modal()
try:
for district in scraper.scrape_districts(state):
for cmplx in scraper.scrape_complexes(state, district):
if SCRAPE_ESTABLISHMENTS:
for establishment in scraper.scrape_establishments(state, district, cmplx):
csv_writer.writerow([ state, district, cmplx, establishment ])
else:
csv_writer.writerow([ state, district, cmplx ])
except Exception as e:
print(f"Error scraping {state}: {e}")
finally:
scraper.driver.quit()
def scrape_courts():
config = {}
m = Scraper(db, config)
m.close_modal()
csv_writer = ThreadSafeCSVWriter('courts.csv')
csv_writer.writerow(['State', 'District', 'Complex'])
states = m.scrape_states()
m.driver.close()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(scrape_state_thread, state, config, csv_writer)
for state in states
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"A thread encountered an error: {e}")
csv_writer.close()
def scrape_orders():
config = {}
m = Scraper(db, config)
m.close_modal()
config['state'] = input('Select a state: ')
config['district'] = input('Select a district: ')
config['court_complex'] = input('Select a court complex: ')
config['court_establishment'] = input('Select a court establishment: ')
config['act'] = input('Select an act: ')
m.select_court()
m.goto_acts()
m.select_act()
m.handle_table()
m.driver.close()
if __name__ == '__main__':
scrape_courts()
@@ -1,0 +1,89 @@
import csv
from modules.scraper_case_status import ScraperCaseStatus
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
SCRAPE_ESTABLISHMENTS = True
class ThreadSafeCSVWriter:
def __init__(self, filename):
self.file = open(filename, 'w', newline='')
self.writer = csv.writer(self.file)
self.lock = threading.Lock()
def writerow(self, row):
with self.lock:
self.writer.writerow(row)
def close(self):
self.file.close()
def scrape_state_thread(state, config, csv_writer):
scraper = ScraperCaseStatus(config)
scraper.close_modal()
try:
scraper.select('sess_state_code', state)
for district in scraper.scrape_districts():
scraper.select('sess_dist_code', district)
for cmplx in scraper.scrape_complexes():
scraper.select('court_complex_code', cmplx)
if SCRAPE_ESTABLISHMENTS:
establishments = []
for establishment in scraper.scrape_establishments():
establishments.append(establishment)
csv_writer.writerow([ state, district, cmplx ] + establishments)
else:
csv_writer.writerow([ state, district, cmplx ])
except Exception as e:
print(f"Error scraping {state}: {e}")
finally:
scraper.driver.quit()
def scrape_courts():
config = {}
m = ScraperCaseStatus(config)
m.close_modal()
csv_writer = ThreadSafeCSVWriter('csv/courts.csv')
csv_writer.writerow(['State', 'District', 'Complex'])
states = m.scrape_states()
m.driver.close()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(scrape_state_thread, state, config, csv_writer)
for state in states
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"A thread encountered an error: {e}")
csv_writer.close()
def scrape_orders():
config = {}
m = ScraperCaseStatus(config)
m.close_modal()
config['state'] = input('Select a state: ')
config['district'] = input('Select a district: ')
config['court_complex'] = input('Select a court complex: ')
config['court_establishment'] = input('Select a court establishment: ')
config['act'] = input('Select an act: ')
m.select_court()
m.goto_acts()
m.select_act()
m.handle_table()
m.driver.close()
if __name__ == '__main__':
scrape_courts()
@@ -1,0 +1,100 @@
import csv
from time import sleep
from modules.scraper_orders import ScraperOrders
from selenium.webdriver.common.by import By
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
class ThreadSafeCSVWriter:
def __init__(self, filename):
self.file = open(filename, 'w', newline='')
self.writer = csv.writer(self.file)
self.lock = threading.Lock()
def writerow(self, row):
with self.lock:
self.writer.writerow(row)
print(f'Wrote: {row}')
def close(self):
self.file.close()
def scrape_district(state, district, csv_writer):
try:
config = {}
scraper = ScraperOrders(config)
scraper.close_modal()
scraper.select('sess_state_code', state)
scraper.select('sess_dist_code', district)
complexes = scraper.scrape_complexes()
scraper.select('court_complex_code', complexes[0])
sleep(2)
scraper.goto_courtnumber()
for cmplx in complexes:
while True:
sleep(0.5)
try:
modal_is_open = scraper.driver.find_element(By.CLASS_NAME, 'modal').is_displayed()
if modal_is_open:
scraper.close_modal()
continue
break
except:
break
scraper.select('court_complex_code', cmplx)
sleep(0.5)
court_numbers = scraper.get_court_numbers()
for court_number in court_numbers:
row = [state, district, cmplx, court_number]
csv_writer.writerow(row)
scraper.driver.quit()
except Exception as e:
print(f"Error scraping district {district}: {e}")
def scrape_courts():
state = 'Uttar Pradesh'
config = {}
scraper = ScraperOrders(config)
scraper.close_modal()
scraper.select('sess_state_code', state)
districts = scraper.scrape_districts()
scraper.driver.quit()
csv_writer = ThreadSafeCSVWriter('csv/court_numbers.csv')
csv_writer.writerow(['State', 'District', 'Cmplx', 'Court number'])
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(scrape_district, state, district, csv_writer)
for district in districts
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"A thread encountered an error: {e}")
csv_writer.close()
def scrape_orders(courts):
csvfile = open(courts, newline='')
reader = csv.reader(csvfile)
for row in reader:
print(row)
csvfile.close()
if __name__ == '__main__':
scrape_orders('csv/2023-24_pocso.csv')
@@ -1,221 +1,0 @@
from time import sleep
import os
import uuid
from urllib import request
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support.select import Select
from bs4 import BeautifulSoup
import cv2
import pytesseract
import tempfile
class Scraper:
def __init__(self, db, config):
self.db = db
self.config = config
options = Options()
options.add_argument("--headless")
self.driver = Firefox(options=options)
self.driver.get('https://services.ecourts.gov.in/ecourtindia_v6/?p=casestatus/index')
self.current_view = {}
def close_modal(self):
sleep(3)
self.driver.execute_script('closeModel({modal_id:"validateError"})')
sleep(1)
def select(self, i_d, value):
sleep(1)
element = self.driver.find_element(By.ID, i_d)
select = Select(element)
select.select_by_visible_text(value)
sleep(1)
def select_act(self):
self.select('actcode', self.config['act'])
sleep(1)
self.driver.find_element(By.ID, 'radDAct').click()
self.submit_search()
def scrape_states(self):
element = self.driver.find_element(By.ID, 'sess_state_code')
options = Select(element).options
states = [ option.text for option in options[1:] ]
print(f'STATES: {states}')
sleep(0.2)
return states
def scrape_districts(self, state):
self.select('sess_state_code', state)
sleep(0.2)
element = self.driver.find_element(By.ID, 'sess_dist_code')
options = Select(element).options
districts = [ option.text for option in options[1:] ]
print(f'DISTRICTS: {districts}')
return districts
def scrape_complexes(self, state, district):
self.select('sess_state_code', state)
sleep(0.2)
self.select('sess_dist_code', district)
sleep(0.2)
element = self.driver.find_element(By.ID, 'court_complex_code')
options = Select(element).options
complexes = [ option.text for option in options[1:] ]
print(f'COMPLEXES: {complexes}')
return complexes
def scrape_establishments(self, state, district, cmplx):
self.select('sess_state_code', state)
sleep(0.2)
self.select('sess_dist_code', district)
sleep(0.2)
self.select('court_complex_code', cmplx)
sleep(1)
element = self.driver.find_element(By.ID, 'court_est_code')
options = Select(element).options
establishments = [ option.text for option in options[1:] ]
print(f'ESTABLISHMENTS: {establishments}')
return establishments
def select_court(self):
sleep(2)
while True:
self.select('sess_state_code', self.config['state'])
self.select('sess_dist_code', self.config['district'])
self.select('court_complex_code', self.config['court_complex'])
sleep(2)
modal_is_open = self.driver.find_element(By.CLASS_NAME, 'alert-danger-cust').is_displayed()
if modal_is_open:
self.close_modal()
continue
break
self.select('court_est_code', self.config['court_establishment'])
def goto_acts(self):
element = self.driver.find_element(By.ID, 'act-tabMenu')
element.click()
sleep(1)
def submit_search(self):
captcha_incomplete = True
while captcha_incomplete:
sleep(2)
img = self.driver.find_element(By.ID, 'captcha_image')
temp = tempfile.NamedTemporaryFile(suffix='.png')
img.screenshot(temp.name)
img = cv2.imread(temp.name)
text = pytesseract.image_to_string(img).strip()
element = self.driver.find_element(By.ID, 'act_captcha_code')
element.send_keys(text)
self.driver.execute_script('submitAct()')
sleep(3)
if self.driver.find_element(By.CLASS_NAME, 'alert-danger-cust').is_displayed():
self.close_modal()
element.clear()
else:
captcha_incomplete = False
def handle_table(self):
table_innerhtml = self.driver.find_element(By.ID, 'dispTable').get_attribute('innerHTML')
self.rows = BeautifulSoup(str(table_innerhtml), 'html.parser').find_all('td')
self.views = []
i = 5
while i < len(self.rows):
view = self.rows[i]
self.current_view = {
'case_info': self.rows[i-2].get_text(strip=True),
'petitioner_respondent': ' Vs '.join(self.rows[i-1].get_text(strip=True).split('Vs')),
'htmlfile': '',
'pdfs': []
}
script = view.find_all('a')[0].get_attribute_list('onclick')[0]
self.driver.execute_script(script)
sleep(1)
html = str(self.driver.find_element(By.ID, 'CSact').get_attribute('innerHTML'))
while True:
filename = f"html/{uuid.uuid4().hex}.html"
if not os.path.exists(filename):
break
self.current_view['htmlfile'] = filename
with open(filename, "w", encoding="utf-8") as f:
f.write(html)
self.parse_orders_table()
self.db.insert(self.current_view)
print(f'INSERTED: {self.current_view}')
self.driver.find_element(By.ID, 'main_back_act').click()
i += 4
def parse_orders_table(self):
try:
table_innerhtml = self.driver.find_element(By.CLASS_NAME, 'order_table').get_attribute('innerHTML')
except:
return
rows = BeautifulSoup(str(table_innerhtml), 'html.parser').find_all('td')
self.orders = []
i = 5
while i < len(rows):
self.orders.append(rows[i])
i += 3
self.handle_orders()
def handle_orders(self):
for order in self.orders:
script = order.find_all('a')[0].get_attribute_list('onclick')[0]
self.driver.execute_script(script)
sleep(2)
obj = self.driver.find_element(By.TAG_NAME, 'object')
pdf_url = str(obj.get_attribute('data'))
while True:
filename = f"pdf/{uuid.uuid4().hex}.pdf"
if not os.path.exists(filename):
break
self.current_view['pdfs'].append(filename)
cookies = "; ".join([f"{c['name']}={c['value']}" for c in self.driver.get_cookies()])
r = request.Request(pdf_url)
r.add_header("Cookie", cookies)
try:
with request.urlopen(r) as response, open(filename, "wb") as file:
file.write(response.read())
except:
print(f'UNABLE TO FETCH PDF: {pdf_url}')
self.driver.find_element(By.ID, 'modalOders').find_element(By.CLASS_NAME, 'btn-close').click()
@@ -1,0 +1,61 @@
from time import sleep
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support.select import Select
class Scraper:
def __init__(self, base_url, headless=True):
options = Options()
if headless:
options.add_argument("--headless")
self.driver = Firefox(options=options)
self.driver.get(base_url)
def close_modal(self):
sleep(3)
self.driver.execute_script('closeModel({modal_id:"validateError"})')
sleep(1)
def select(self, i_d, value):
sleep(1)
element = self.driver.find_element(By.ID, i_d)
select = Select(element)
select.select_by_visible_text(value)
sleep(1)
def scrape_states(self):
element = self.driver.find_element(By.ID, 'sess_state_code')
options = Select(element).options
states = [ option.text for option in options[1:] ]
print(f'STATES: {states}')
sleep(0.2)
return states
def scrape_districts(self):
element = self.driver.find_element(By.ID, 'sess_dist_code')
options = Select(element).options
districts = [ option.text for option in options[1:] ]
print(f'DISTRICTS: {districts}')
return districts
def scrape_complexes(self):
element = self.driver.find_element(By.ID, 'court_complex_code')
options = Select(element).options
complexes = [ option.text for option in options[1:] ]
print(f'COMPLEXES: {complexes}')
return complexes
def scrape_establishments(self):
element = self.driver.find_element(By.ID, 'court_est_code')
options = Select(element).options
establishments = [ option.text for option in options[1:] if option.text != '' ]
print(f'ESTABLISHMENTS: {establishments}')
return establishments
@@ -1,0 +1,156 @@
from time import sleep
import os
import uuid
from urllib import request
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from bs4 import BeautifulSoup
import cv2
import pytesseract
import tempfile
from tinydb import TinyDB
from .scraper import Scraper
class ScraperCaseStatus(Scraper):
def __init__(self, config):
Scraper.__init__(self, 'https://services.ecourts.gov.in/ecourtindia_v6/?p=casestatus/index')
self.db = TinyDB('db.json')
self.config = config
def select_act(self):
self.select('actcode', self.config['act'])
sleep(1)
self.driver.find_element(By.ID, 'radDAct').click()
self.submit_search()
def select_court(self):
sleep(2)
while True:
self.select('sess_state_code', self.config['state'])
self.select('sess_dist_code', self.config['district'])
self.select('court_complex_code', self.config['court_complex'])
sleep(2)
modal_is_open = self.driver.find_element(By.CLASS_NAME, 'alert-danger-cust').is_displayed()
if modal_is_open:
self.close_modal()
continue
break
self.select('court_est_code', self.config['court_establishment'])
def goto_acts(self):
element = self.driver.find_element(By.ID, 'act-tabMenu')
element.click()
sleep(1)
def submit_search(self):
captcha_incomplete = True
while captcha_incomplete:
sleep(2)
img = self.driver.find_element(By.ID, 'captcha_image')
temp = tempfile.NamedTemporaryFile(suffix='.png')
img.screenshot(temp.name)
img = cv2.imread(temp.name)
text = pytesseract.image_to_string(img).strip()
element = self.driver.find_element(By.ID, 'act_captcha_code')
element.send_keys(text)
self.driver.execute_script('submitAct()')
sleep(3)
if self.driver.find_element(By.CLASS_NAME, 'alert-danger-cust').is_displayed():
self.close_modal()
element.clear()
else:
captcha_incomplete = False
def handle_table(self):
table_innerhtml = self.driver.find_element(By.ID, 'dispTable').get_attribute('innerHTML')
self.rows = BeautifulSoup(str(table_innerhtml), 'html.parser').find_all('td')
self.views = []
i = 5
while i < len(self.rows):
view = self.rows[i]
self.current_view = {
'case_info': self.rows[i-2].get_text(strip=True),
'petitioner_respondent': ' Vs '.join(self.rows[i-1].get_text(strip=True).split('Vs')),
'htmlfile': '',
'pdfs': []
}
script = view.find_all('a')[0].get_attribute_list('onclick')[0]
self.driver.execute_script(script)
sleep(1)
html = str(self.driver.find_element(By.ID, 'CSact').get_attribute('innerHTML'))
while True:
filename = f"html/{uuid.uuid4().hex}.html"
if not os.path.exists(filename):
break
self.current_view['htmlfile'] = filename
with open(filename, "w", encoding="utf-8") as f:
f.write(html)
self.parse_orders_table()
self.db.insert(self.current_view)
print(f'INSERTED: {self.current_view}')
self.driver.find_element(By.ID, 'main_back_act').click()
i += 4
def parse_orders_table(self):
try:
table_innerhtml = self.driver.find_element(By.CLASS_NAME, 'order_table').get_attribute('innerHTML')
except:
return
rows = BeautifulSoup(str(table_innerhtml), 'html.parser').find_all('td')
self.orders = []
i = 5
while i < len(rows):
self.orders.append(rows[i])
i += 3
self.handle_orders()
def handle_orders(self):
for order in self.orders:
script = order.find_all('a')[0].get_attribute_list('onclick')[0]
self.driver.execute_script(script)
sleep(2)
obj = self.driver.find_element(By.TAG_NAME, 'object')
pdf_url = str(obj.get_attribute('data'))
while True:
filename = f"pdf/{uuid.uuid4().hex}.pdf"
if not os.path.exists(filename):
break
self.current_view['pdfs'].append(filename)
cookies = "; ".join([f"{c['name']}={c['value']}" for c in self.driver.get_cookies()])
r = request.Request(pdf_url)
r.add_header("Cookie", cookies)
try:
with request.urlopen(r) as response, open(filename, "wb") as file:
file.write(response.read())
except:
print(f'UNABLE TO FETCH PDF: {pdf_url}')
self.driver.find_element(By.ID, 'modalOders').find_element(By.CLASS_NAME, 'btn-close').click()
@@ -1,0 +1,29 @@
from time import sleep
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from tinydb import TinyDB
from .scraper import Scraper
class ScraperOrders(Scraper):
def __init__(self, config):
Scraper.__init__(self, 'https://services.ecourts.gov.in/ecourtindia_v6/?p=courtorder/index')
self.db = TinyDB('db.json')
self.config = config
def goto_courtnumber(self):
element = self.driver.find_element(By.ID, 'courtnumber-tabMenu')
element.click()
sleep(1)
def get_court_numbers(self):
element = self.driver.find_element(By.ID, 'nnjudgecode1')
select = Select(element)
options = select.options
court_numbers = [ option.text for option in options ]
print(f'COURT NUMBERS: {court_numbers}')
return court_numbers