Kesav Kolla
08/27/2025, 11:26 AMGarrett Weaver
08/27/2025, 6:18 PMdaft.func
vs daft.udf
? I would guess that if the the underlying python code is not taking advantage of any vectorization but maybe just a list comprehension [my_func(x) for x in some_series],
then just use daft.func
?Garrett Weaver
08/28/2025, 4:21 PMVOID 001
08/29/2025, 3:55 AMdf = daft.from_pydict({
"json": [
'{"a": 1, "b": 2}',
'{"a": 3, "b": 4}',
],
})
df = daft.sql("SELECT json.* FROM df")
df.collect()
Amir Shukayev
08/29/2025, 4:01 AMconcat
lazy? Like
df = reduce(
lambda df1, df2: df1.concat(df2),
[
df_provider[i].get_daft_df()
for i in range(num_dfs)
],
)
Is there any way to lazily combine a set of dfs? in any orderSky Yin
08/29/2025, 10:31 PMGarrett Weaver
09/04/2025, 8:41 PMget_next_partition
is running there.Desmond Cheong
09/04/2025, 11:58 PMVOID 001
09/05/2025, 5:56 AMPeer Schendel
09/07/2025, 9:10 AMimport os
from openai import AzureOpenAI
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2025-03-01-preview",
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
)
# Upload a file with a purpose of "batch"
file = client.files.create(
file=open("test.jsonl", "rb"),
purpose="batch",
extra_body={"expires_after":{"seconds": 1209600, "anchor": "created_at"}} # Optional you can set to a number between 1209600-2592000. This is equivalent to 14-30 days
)
print(file.model_dump_json(indent=2))
print(f"File expiration: {datetime.fromtimestamp(file.expires_at) if file.expires_at is not None else 'Not set'}")
file_id = file.id
Edmondo Porcu
09/07/2025, 4:17 PMChanChan Mao
09/08/2025, 5:29 PMChanChan Mao
09/09/2025, 6:23 PMKyle
09/11/2025, 5:04 AMEdmondo Porcu
09/12/2025, 6:36 PMRakesh Jain
09/12/2025, 10:15 PMKyle
09/15/2025, 6:22 AM吕威
09/16/2025, 7:22 AM@udf(
return_dtype=DataType.list(
DataType.struct(
{
"class": DataType.string(),
"score": DataType.float64(),
"cropped_img": DataType.image(),
"bbox": DataType.list(DataType.int64()),
}
)
),
num_gpus=1,
batch_size=16,
)
class YOLOWorldOnnxObjDetect:
def __init__(
self,
model_path: str,
device: str = "cuda:0",
confidence: float = 0.25,
):
# int model
pass
def __call__(self, images_2d_col: Series) -> List[List[dict]]:
images: List[np.ndarray] = images_2d_col.to_pylist()
results = self.yolo.predict(source=images, conf=self.confidence)
for r in results:
img_result = []
orig_img = r.orig_img
for box in r.boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(orig_img.shape[1], x2), min(orig_img.shape[0], y2)
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
cls = int(box.cls[0])
img_result.append(
{
"class": self.yolo.names[cls],
"score": float(box.conf[0]),
"cropped_img": {
"cropimg": cv2.cvtColor(
orig_img[y1:y2, x1:x2], cv2.COLOR_BGR2RGB
),
},
"bbox": [x1, y1, x2, y2],
}
)
objs.append(img_result)
return objs
the cropped_img must return with dict, if direct return np.ndarray, will raise Could not convert array(..., dtype=uint8) with type numpy.ndarray: was expecting tuple of (key, value) pair error
why?ChanChan Mao
09/22/2025, 4:00 PMChanChan Mao
09/23/2025, 12:33 AMdaft.File
datatype. Following that, @Colin Ho will dive into his work on Flotilla, our distributed engine, and showcase some exciting benchmark results 👀 We'll leave plenty of time at the end for questions and discussions.
Add to your calendar and we'll see you then! 👋Garrett Weaver
09/23/2025, 10:28 PMAmir Shukayev
09/24/2025, 11:38 PMNathan Cai
09/24/2025, 11:59 PM# Supply actual values for the s3
Not
# Supply actual values for the se
in the docs?
https://docs.daft.ai/en/stable/connectors/aws/#rely-on-environment
from <http://daft.io|daft.io> import IOConfig, S3Config
# Supply actual values for the se
io_config = IOConfig(s3=S3Config(key_id="key_id", session_token="session_token", secret_key="secret_key"))
# Globally set the default IOConfig for any subsequent I/O calls
daft.set_planning_config(default_io_config=io_config)
# Perform some I/O operation
df = daft.read_parquet("<s3://my_bucket/my_path/**/*>")
ChanChan Mao
09/25/2025, 11:01 PMGarrett Weaver
09/26/2025, 4:58 AMuse_process=True
everything works fine locally (mac), but seeing /usr/bin/bash: line 1: 58 Bus error (core dumped)
when run on k8s (argo workflows). any thoughts?Garrett Weaver
09/26/2025, 5:11 PMexplode
with average result being 1 row --> 12 rows and max 1 row --> 366 rows (~5m rows --> ~66m rows). Seeing decently high memory usage during the explode even with a repartition prior to explode. Is the only remedy more partitions and/or reduced number cpus to reduce parallelism?Nathan Cai
09/29/2025, 1:00 PMRobert Howell
09/30/2025, 4:39 PMDan Reverri
09/30/2025, 9:50 PMChanChan Mao
10/03/2025, 9:35 PM