A python script allowing you to access wordpress sites as their own class. One instance per site, for the user more comfortable in a class-centric object-oriented paradigm.
I had wanted to connect to WordPress and do some site updates for clients from directly within python, I couldn’t find any wordpress library that wasn’t a hobby project on someone’s github. They were good but I’d rather make my own, right?
I found robingeuens.com/blog/python-wordpress-api which had everything I needed to know to get started without needing to touch any boring manuals/documentation.
I copied some examples, extended them a little, and tweaked everything to place it within a unifying class, I streamlined authentication and wrote a few additional helper functions. Things like page_id & post_id which can return an elements id by searching for its link, title, or slug (soon these will all be handled via regular expressions, at the moment they’re just literal case insensitive searches similar to your browser’s find function)
I probably could’ve condensed it by using higher order functions to perform both the posts and the pages within one unified method and I probably will on future versions.
import requests
import base64
class WordPress(target):
def __init__(self,domain,user=None,admin=None,wpjson=None):
if wpjson==None:
wpjson=domain+"/wp-json/"
#check wpjson
#wpjson=domain+"index.php/wp-json"
self.wpjson=wpjson
self.domain=domain
self.user=user
self.admin=admin
self.wpadmin=domain+"/wp-admin/"
self.session=None
self.wplogin=domain+"/wp-login.php"
self.gentoken()
self.posts=None
self.pages=None
def displaypages(self):
if self.pages==None:
self.getpages()
for i in self.pages:
print("Title: %s\nLink: %s \nID: %s"%(i['title']['rendered'],i['link'],i['id']))
def displayposts(self):
if self.posts==None:
self.getposts()
for i in self.posts:
print("Title: %s\nLink: %s \nID: %s"%(i['title']['rendered'],i['link'],i['id']))
def gentoken(self):
wordpress_credentials =self.user["username"]+":"+self.user["password"]
self.token = base64.b64encode(wordpress_credentials.encode())
self.wordpress_header = {'Authorization': 'Basic ' + self.token.decode('utf-8')}
def test(self):
#login
#upload a post
#check if the post is present
#remove the post
pass
#upload a page
#check if the page is present
#remove the page
def getposts(self):
api_url = self.wpjson+'wp/v2/posts'
#response = requests.get(api_url)
if self.session==None:
self.login()
response = self.session.get(api_url)
response_header=response.headers
response_json = response.json()
self.posts=response_json
return response_header,response_json
def getpages(self):
api_url = self.wpjson+'wp/v2/pages'
#response = requests.get(api_url)
if self.session==None:
self.login()
response = self.session.get(api_url)
response_header=response.headers
response_json = response.json()
self.pages=response_json
return response_header,response_json
def login(self,redirect=None):
if redirect==None:
redirect=self.admin
with requests.Session() as s:
#self.headers = {'Cookie':"test_cookie=WP Cookie check"}
data={'log':self.user["username"],
'pwd':self.user["password"],
'wp-submit':'Log In',
'redirect_to':redirect}
s.post(self.wplogin,headers=self.wordpress_header,data=data)
self.session=s
#resp=s.get(self.wpadmin)
def create_post(self,content,title="API TEST",slug="test-post"):
api_url = self.wpjson+'/wp/v2/posts'
postdata = {
'title' : title,
'status': 'publish',
'slug' : slug,
'content': content
}
response = self.session.post(api_url,headers=self.wordpress_header, json=postdata)
print("creating post %s"%title)
if response.status_code=='201':
print('success')
else:
print('Failed status code %s'%response.status_code)
return response
def create_page(self,content,title="API TEST",slug="test-page"):
api_url = self.wpjson+'/wp/v2/pages'
postdata = {
'title' : title,
'status': 'publish',
'slug' : slug,
'content': content
}
response = self.session.post(api_url,headers=self.wordpress_header, json=postdata)
print("creating post %s"%title)
if response.status_code==201:
print('success')
else:
print('Failed!\n Status Code:%s'%response.status_code)
return response
def page_id(self,title=None,slug=None,link=None):
if self.pages==None:
self.getpages()
if title==None and link==None and slug==None:
print("Needs title, slug, or link to lookup ID")
for i in self.pages:
if title:
if i['title']['rendered'].lower()==title.lower():
return i['id']
if slug:
if i['slug'].lower()==slug.lower():
return i['id']
if link:
if i['link'].lower()==link.lower():
return i['id']
return None
def post_id(self,title=None,slug=None,link=None):
if self.posts==None:
self.getposts()
if title==None and link==None and slug==None:
print("Needs title, slug, or link to lookup ID")
for i in self.posts:
if i['title']['rendered'].lower()==title.lower():
return i['id']
if i['slug'].lower()==slug.lower():
return i['id']
if i['link'].lower()==link.lower():
return i['id']
return None
def update_post(self,post_id,content,title,slug):
api_url = self.wpjson+'/wp/v2/posts/'
data = {
'title' : title,
'status': 'publish',
'slug' : slug,
'content': content
}
response = requests.post(api_url+str(post_id),headers=self.wordpress_header, json=data)
print("editing post id:%s"%str(post_id))
if response.status_code==200 or response.status_code==201:
print('success')
else:
print('Failed!\n Status Code:%s'%response.status_code)
return response
def update_page(self,page_id,content,title,slug):
api_url = self.wpjson+'/wp/v2/pages/'
data = {
'title' : title,
'status': 'publish',
'slug' : slug,
'content': content
}
response = requests.post(api_url+str(page_id),headers=self.wordpress_header, json=data)
print("editing page id:%s"%str(page_id))
if response.status_code==200 or response.status_code==201:
print('success')
else:
print('Failed!\n Status Code:%s'%response.status_code)
return response
def delete_page(self,page_id):
api_url = self.wpjson+'/wp/v2/pages/'
response = self.session.delete(api_url + page_id, headers=self.wordpress_header)
return response
def delete_post(self,post_id):
api_url = self.wpjson+'/wp/v2/posts/'
response = self.session.delete(api_url + post_id, headers=self.wordpress_header)
return response
Sphinx=Wordpress("https://sphinxcotton.com.au",user=SphinxAcc,wpjson="https://sphinxcotton.com.au/index.php/wp-json/")
#Sphinx.create_page("CONTENT","API TEST",slug="SLUG")
a=Sphinx.upimport requests
import base64
class WordPress(target):
def __init__(self,domain,user=None,admin=None,wpjson=None):
if wpjson==None:
wpjson=domain+"/wp-json/"
#check wpjson
#wpjson=domain+"index.php/wp-json"
self.wpjson=wpjson
self.domain=domain
self.user=user
self.admin=admin
self.wpadmin=domain+"/wp-admin/"
self.session=None
self.wplogin=domain+"/wp-login.php"
self.gentoken()
self.posts=None
self.pages=None
def displaypages(self):
if self.pages==None:
self.getpages()
for i in self.pages:
print("Title: %s\nLink: %s \nID: %s"%(i['title']['rendered'],i['link'],i['id']))
def displayposts(self):
if self.posts==None:
self.getposts()
for i in self.posts:
print("Title: %s\nLink: %s \nID: %s"%(i['title']['rendered'],i['link'],i['id']))
def gentoken(self):
wordpress_credentials =self.user["username"]+":"+self.user["password"]
self.token = base64.b64encode(wordpress_credentials.encode())
self.wordpress_header = {'Authorization': 'Basic ' + self.token.decode('utf-8')}
def test(self):
#login
#upload a post
#check if the post is present
#remove the post
pass
#upload a page
#check if the page is present
#remove the page
def getposts(self):
api_url = self.wpjson+'wp/v2/posts'
#response = requests.get(api_url)
if self.session==None:
self.login()
response = self.session.get(api_url)
response_header=response.headers
response_json = response.json()
self.posts=response_json
return response_header,response_json
def getpages(self):
api_url = self.wpjson+'wp/v2/pages'
#response = requests.get(api_url)
if self.session==None:
self.login()
response = self.session.get(api_url)
response_header=response.headers
response_json = response.json()
self.pages=response_json
return response_header,response_json
def login(self,redirect=None):
if redirect==None:
redirect=self.admin
with requests.Session() as s:
#self.headers = {'Cookie':"test_cookie=WP Cookie check"}
data={'log':self.user["username"],
'pwd':self.user["password"],
'wp-submit':'Log In',
'redirect_to':redirect}
s.post(self.wplogin,headers=self.wordpress_header,data=data)
self.session=s
#resp=s.get(self.wpadmin)
def create_post(self,content,title="API TEST",slug="test-post"):
api_url = self.wpjson+'/wp/v2/posts'
postdata = {
'title' : title,
'status': 'publish',
'slug' : slug,
'content': content
}
response = self.session.post(api_url,headers=self.wordpress_header, json=postdata)
print("creating post %s"%title)
if response.status_code=='201':
print('success')
else:
print('Failed status code %s'%response.status_code)
return response
def create_page(self,content,title="API TEST",slug="test-page"):
api_url = self.wpjson+'/wp/v2/pages'
postdata = {
'title' : title,
'status': 'publish',
'slug' : slug,
'content': content
}
response = self.session.post(api_url,headers=self.wordpress_header, json=postdata)
print("creating post %s"%title)
if response.status_code==201:
print('success')
else:
print('Failed!\n Status Code:%s'%response.status_code)
return response
def page_id(self,title=None,slug=None,link=None):
if self.pages==None:
self.getpages()
if title==None and link==None and slug==None:
print("Needs title, slug, or link to lookup ID")
for i in self.pages:
if title:
if i['title']['rendered'].lower()==title.lower():
return i['id']
if slug:
if i['slug'].lower()==slug.lower():
return i['id']
if link:
if i['link'].lower()==link.lower():
return i['id']
return None
def post_id(self,title=None,slug=None,link=None):
if self.posts==None:
self.getposts()
if title==None and link==None and slug==None:
print("Needs title, slug, or link to lookup ID")
for i in self.posts:
if i['title']['rendered'].lower()==title.lower():
return i['id']
if i['slug'].lower()==slug.lower():
return i['id']
if i['link'].lower()==link.lower():
return i['id']
return None
def update_post(self,post_id,content,title,slug):
api_url = self.wpjson+'/wp/v2/posts/'
data = {
'title' : title,
'status': 'publish',
'slug' : slug,
'content': content
}
response = requests.post(api_url+str(post_id),headers=self.wordpress_header, json=data)
print("editing post id:%s"%str(post_id))
if response.status_code==200 or response.status_code==201:
print('success')
else:
print('Failed!\n Status Code:%s'%response.status_code)
return response
def update_page(self,page_id,content,title,slug):
api_url = self.wpjson+'/wp/v2/pages/'
data = {
'title' : title,
'status': 'publish',
'slug' : slug,
'content': content
}
response = requests.post(api_url+str(page_id),headers=self.wordpress_header, json=data)
print("editing page id:%s"%str(page_id))
if response.status_code==200 or response.status_code==201:
print('success')
else:
print('Failed!\n Status Code:%s'%response.status_code)
return response
def delete_page(self,page_id):
api_url = self.wpjson+'/wp/v2/pages/'
response = self.session.delete(api_url + page_id, headers=self.wordpress_header)
return response
def delete_post(self,post_id):
api_url = self.wpjson+'/wp/v2/posts/'
response = self.session.delete(api_url + post_id, headers=self.wordpress_header)
return response
MY_SITEAcc={"username":"MY_SITEAdmin","password":"Generate an app password in your wordpress, it's a safety thing."}
MY_SITE=Wordpress("https://MY_SITE.com.au",user=MY_SITEAcc,wpjson="https://MY_SITE.com.au/wp-json/")#initialise the wordpress object with me site, my user account, and a link to my wp-json
#MY_SITE.create_page("CONTENT","API TEST",slug="SLUG")
#a=MY_SITE.update_post(MY_SITE.post_id(title='landing page'),content="landing page updated via API",title='landing page',slug='landing-page-updated')
#a=MY_SITE.create_page("CONTENT","TITLE",slug="SLUG")
#a=MY_SITE.create_post("New post, created from Python via the REST API")
0