PyUNO での InputStream

以前何度やってもうまくいかなかった気がする Python で実装した InputStream から Writer ドキュメントに HTML ソースを挿入。なんだか動作する、前のソースが見つからないのでどこが違うのかなんともいえない。

EOF などの処理をする必要あり。

import sys
sys.path.append('/home/asuka/local/opt/ooo-dev/basis3.3/program')
import uno

html_header = """<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.0 Transitional//EN\">
	<html>
	<head>
		<META HTTP-EQUIV="CONTENT-TYPE" CONTENT="text/html; charset=utf-8">
	</head>
	<body>"""
html_footer = "</body></html>"


import unohelper
from uno import ByteSequence
from com.sun.star.io import XInputStream, XSeekable, XSeekableInputStream


class PyTextInputStream(unohelper.Base, 
	XInputStream, XSeekable, XSeekableInputStream):
	
	def __init__(self, text):
		self.data = text
		self.length = len(self.data)
		self.pos = 0
		self.opened = True
	
	# XInputStream
	def readBytes(self, d, n):
		v = "" if self.pos >= self.length else self.data[self.pos:self.pos + n]
		self.pos += n
		return len(v), ByteSequence(v)
	
	def readSomeBytes(self, d, n):
		v = "" if self.pos >= self.length else self.data[self.pos:self.pos + n]
		self.pos += n
		return len(v), ByteSequence(v)
	
	def skipBytes(self, n):
		self.pos += n
	
	def available(self):
		return len(self.data)
	
	def closeInput(self):
		self.opened = False
	
	# XSeekable
	def seek(self, n):
		self.pos = n
	
	def getPosition(self):
		return self.pos
	
	def getLength(self):
		return len(self.data)


def input_test(ctx):
	smgr = ctx.getServiceManager()
	from com.sun.star.beans import PropertyValue
	
	arg1 = PropertyValue()
	arg1.Name = "InputStream"
	arg2 = PropertyValue()
	arg2.Name = "FilterName"
	arg2.Value = "writer_web_HTML"
	
	b = '<b>123</b>'
	s = html_header + b + html_footer
	
	#sequence = smgr.createInstanceWithContext(
	#				"com.sun.star.io.SequenceInputStream", ctx) 
	#sequence.initialize((uno.ByteSequence(s.encode('utf-8')),))
	sequence = PyTextInputStream(s.encode('utf-8'))
	arg1.Value = sequence
	
	desktop = smgr.createInstanceWithContext(
		"com.sun.star.frame.Desktop", ctx)
	doc = desktop.getCurrentComponent()
	
	text = doc.getText()
	text.getEnd().insertDocumentFromURL("", (arg1, arg2))
	sequence.closeInput()
	
	print("done.")


def connect():
	try:
		localctx = uno.getComponentContext()
		resolver = localctx.ServiceManager.createInstanceWithContext(
			"com.sun.star.bridge.UnoUrlResolver",localctx)
		ctx = resolver.resolve(
			"uno:socket,host=localhost,port=2083;urp;StarOffice.ComponentContext")
	except:
		return None
	return ctx
 
 
if __name__=="__main__":
	ctx = connect()
	if ctx:
		input_test(ctx)
		ctx.getServiceManager()
	else:
		print("no connection availabled.")